sys_pipe.c revision 94539
1/*
2 * Copyright (c) 1996 John S. Dyson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice immediately at the beginning of the file, without modification,
10 *    this list of conditions, and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 * 3. Absolutely no warranty of function or purpose is made by the author
15 *    John S. Dyson.
16 * 4. Modifications may be freely made to this file if the above conditions
17 *    are met.
18 *
19 * $FreeBSD: head/sys/kern/sys_pipe.c 94539 2002-04-12 19:38:41Z tmm $
20 */
21
22/*
23 * This file contains a high-performance replacement for the socket-based
24 * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
25 * all features of sockets, but does do everything that pipes normally
26 * do.
27 */
28
29/*
30 * This code has two modes of operation, a small write mode and a large
31 * write mode.  The small write mode acts like conventional pipes with
32 * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
33 * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
34 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
35 * the receiving process can copy it directly from the pages in the sending
36 * process.
37 *
38 * If the sending process receives a signal, it is possible that it will
39 * go away, and certainly its address space can change, because control
40 * is returned back to the user-mode side.  In that case, the pipe code
41 * arranges to copy the buffer supplied by the user process, to a pageable
42 * kernel buffer, and the receiving process will grab the data from the
43 * pageable kernel buffer.  Since signals don't happen all that often,
44 * the copy operation is normally eliminated.
45 *
46 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
47 * happen for small transfers so that the system will not spend all of
48 * its time context switching.  PIPE_SIZE is constrained by the
49 * amount of kernel virtual memory.
50 */
51
52#include <sys/param.h>
53#include <sys/systm.h>
54#include <sys/fcntl.h>
55#include <sys/file.h>
56#include <sys/filedesc.h>
57#include <sys/filio.h>
58#include <sys/kernel.h>
59#include <sys/lock.h>
60#include <sys/mutex.h>
61#include <sys/ttycom.h>
62#include <sys/stat.h>
63#include <sys/malloc.h>
64#include <sys/poll.h>
65#include <sys/selinfo.h>
66#include <sys/signalvar.h>
67#include <sys/sysproto.h>
68#include <sys/pipe.h>
69#include <sys/proc.h>
70#include <sys/vnode.h>
71#include <sys/uio.h>
72#include <sys/event.h>
73
74#include <vm/vm.h>
75#include <vm/vm_param.h>
76#include <vm/vm_object.h>
77#include <vm/vm_kern.h>
78#include <vm/vm_extern.h>
79#include <vm/pmap.h>
80#include <vm/vm_map.h>
81#include <vm/vm_page.h>
82#include <vm/uma.h>
83
84/*
85 * Use this define if you want to disable *fancy* VM things.  Expect an
86 * approx 30% decrease in transfer rate.  This could be useful for
87 * NetBSD or OpenBSD.
88 */
89/* #define PIPE_NODIRECT */
90
91/*
92 * interfaces to the outside world
93 */
94static int pipe_read(struct file *fp, struct uio *uio,
95		struct ucred *cred, int flags, struct thread *td);
96static int pipe_write(struct file *fp, struct uio *uio,
97		struct ucred *cred, int flags, struct thread *td);
98static int pipe_close(struct file *fp, struct thread *td);
99static int pipe_poll(struct file *fp, int events, struct ucred *cred,
100		struct thread *td);
101static int pipe_kqfilter(struct file *fp, struct knote *kn);
102static int pipe_stat(struct file *fp, struct stat *sb, struct thread *td);
103static int pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct thread *td);
104
105static struct fileops pipeops = {
106	pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter,
107	pipe_stat, pipe_close
108};
109
110static void	filt_pipedetach(struct knote *kn);
111static int	filt_piperead(struct knote *kn, long hint);
112static int	filt_pipewrite(struct knote *kn, long hint);
113
114static struct filterops pipe_rfiltops =
115	{ 1, NULL, filt_pipedetach, filt_piperead };
116static struct filterops pipe_wfiltops =
117	{ 1, NULL, filt_pipedetach, filt_pipewrite };
118
119#define PIPE_GET_GIANT(pipe)						\
120	do {								\
121		KASSERT(((pipe)->pipe_state & PIPE_LOCKFL) != 0,	\
122		    ("%s:%d PIPE_GET_GIANT: line pipe not locked",	\
123		     __FILE__, __LINE__));				\
124		PIPE_UNLOCK(pipe);					\
125		mtx_lock(&Giant);					\
126	} while (0)
127
128#define PIPE_DROP_GIANT(pipe)						\
129	do {								\
130		mtx_unlock(&Giant);					\
131		PIPE_LOCK(pipe);					\
132	} while (0)
133
134/*
135 * Default pipe buffer size(s), this can be kind-of large now because pipe
136 * space is pageable.  The pipe code will try to maintain locality of
137 * reference for performance reasons, so small amounts of outstanding I/O
138 * will not wipe the cache.
139 */
140#define MINPIPESIZE (PIPE_SIZE/3)
141#define MAXPIPESIZE (2*PIPE_SIZE/3)
142
143/*
144 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
145 * is there so that on large systems, we don't exhaust it.
146 */
147#define MAXPIPEKVA (8*1024*1024)
148
149/*
150 * Limit for direct transfers, we cannot, of course limit
151 * the amount of kva for pipes in general though.
152 */
153#define LIMITPIPEKVA (16*1024*1024)
154
155/*
156 * Limit the number of "big" pipes
157 */
158#define LIMITBIGPIPES	32
159static int nbigpipe;
160
161static int amountpipekva;
162
163static void pipeinit(void *dummy __unused);
164static void pipeclose(struct pipe *cpipe);
165static void pipe_free_kmem(struct pipe *cpipe);
166static int pipe_create(struct pipe **cpipep);
167static __inline int pipelock(struct pipe *cpipe, int catch);
168static __inline void pipeunlock(struct pipe *cpipe);
169static __inline void pipeselwakeup(struct pipe *cpipe);
170#ifndef PIPE_NODIRECT
171static int pipe_build_write_buffer(struct pipe *wpipe, struct uio *uio);
172static void pipe_destroy_write_buffer(struct pipe *wpipe);
173static int pipe_direct_write(struct pipe *wpipe, struct uio *uio);
174static void pipe_clone_write_buffer(struct pipe *wpipe);
175#endif
176static int pipespace(struct pipe *cpipe, int size);
177
178static uma_zone_t pipe_zone;
179
180SYSINIT(vfs, SI_SUB_VFS, SI_ORDER_ANY, pipeinit, NULL);
181
182static void
183pipeinit(void *dummy __unused)
184{
185	pipe_zone = uma_zcreate("PIPE", sizeof(struct pipe), NULL,
186	    NULL, NULL, NULL, UMA_ALIGN_PTR, 0);
187}
188
189/*
190 * The pipe system call for the DTYPE_PIPE type of pipes
191 */
192
193/* ARGSUSED */
194int
195pipe(td, uap)
196	struct thread *td;
197	struct pipe_args /* {
198		int	dummy;
199	} */ *uap;
200{
201	struct filedesc *fdp = td->td_proc->p_fd;
202	struct file *rf, *wf;
203	struct pipe *rpipe, *wpipe;
204	struct mtx *pmtx;
205	int fd, error;
206
207	KASSERT(pipe_zone != NULL, ("pipe_zone not initialized"));
208
209	pmtx = malloc(sizeof(*pmtx), M_TEMP, M_WAITOK | M_ZERO);
210
211	rpipe = wpipe = NULL;
212	if (pipe_create(&rpipe) || pipe_create(&wpipe)) {
213		pipeclose(rpipe);
214		pipeclose(wpipe);
215		free(pmtx, M_TEMP);
216		return (ENFILE);
217	}
218
219	rpipe->pipe_state |= PIPE_DIRECTOK;
220	wpipe->pipe_state |= PIPE_DIRECTOK;
221
222	error = falloc(td, &rf, &fd);
223	if (error) {
224		pipeclose(rpipe);
225		pipeclose(wpipe);
226		free(pmtx, M_TEMP);
227		return (error);
228	}
229	fhold(rf);
230	td->td_retval[0] = fd;
231
232	/*
233	 * Warning: once we've gotten past allocation of the fd for the
234	 * read-side, we can only drop the read side via fdrop() in order
235	 * to avoid races against processes which manage to dup() the read
236	 * side while we are blocked trying to allocate the write side.
237	 */
238	FILE_LOCK(rf);
239	rf->f_flag = FREAD | FWRITE;
240	rf->f_type = DTYPE_PIPE;
241	rf->f_data = (caddr_t)rpipe;
242	rf->f_ops = &pipeops;
243	FILE_UNLOCK(rf);
244	error = falloc(td, &wf, &fd);
245	if (error) {
246		FILEDESC_LOCK(fdp);
247		if (fdp->fd_ofiles[td->td_retval[0]] == rf) {
248			fdp->fd_ofiles[td->td_retval[0]] = NULL;
249			FILEDESC_UNLOCK(fdp);
250			fdrop(rf, td);
251		} else
252			FILEDESC_UNLOCK(fdp);
253		fdrop(rf, td);
254		/* rpipe has been closed by fdrop(). */
255		pipeclose(wpipe);
256		free(pmtx, M_TEMP);
257		return (error);
258	}
259	FILE_LOCK(wf);
260	wf->f_flag = FREAD | FWRITE;
261	wf->f_type = DTYPE_PIPE;
262	wf->f_data = (caddr_t)wpipe;
263	wf->f_ops = &pipeops;
264	FILE_UNLOCK(wf);
265	td->td_retval[1] = fd;
266	rpipe->pipe_peer = wpipe;
267	wpipe->pipe_peer = rpipe;
268	mtx_init(pmtx, "pipe mutex", NULL, MTX_DEF | MTX_RECURSE);
269	rpipe->pipe_mtxp = wpipe->pipe_mtxp = pmtx;
270	fdrop(rf, td);
271
272	return (0);
273}
274
275/*
276 * Allocate kva for pipe circular buffer, the space is pageable
277 * This routine will 'realloc' the size of a pipe safely, if it fails
278 * it will retain the old buffer.
279 * If it fails it will return ENOMEM.
280 */
281static int
282pipespace(cpipe, size)
283	struct pipe *cpipe;
284	int size;
285{
286	struct vm_object *object;
287	caddr_t buffer;
288	int npages, error;
289
290	GIANT_REQUIRED;
291	KASSERT(cpipe->pipe_mtxp == NULL || !mtx_owned(PIPE_MTX(cpipe)),
292	       ("pipespace: pipe mutex locked"));
293
294	npages = round_page(size)/PAGE_SIZE;
295	/*
296	 * Create an object, I don't like the idea of paging to/from
297	 * kernel_object.
298	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
299	 */
300	object = vm_object_allocate(OBJT_DEFAULT, npages);
301	buffer = (caddr_t) vm_map_min(kernel_map);
302
303	/*
304	 * Insert the object into the kernel map, and allocate kva for it.
305	 * The map entry is, by default, pageable.
306	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
307	 */
308	error = vm_map_find(kernel_map, object, 0,
309		(vm_offset_t *) &buffer, size, 1,
310		VM_PROT_ALL, VM_PROT_ALL, 0);
311
312	if (error != KERN_SUCCESS) {
313		vm_object_deallocate(object);
314		return (ENOMEM);
315	}
316
317	/* free old resources if we're resizing */
318	pipe_free_kmem(cpipe);
319	cpipe->pipe_buffer.object = object;
320	cpipe->pipe_buffer.buffer = buffer;
321	cpipe->pipe_buffer.size = size;
322	cpipe->pipe_buffer.in = 0;
323	cpipe->pipe_buffer.out = 0;
324	cpipe->pipe_buffer.cnt = 0;
325	amountpipekva += cpipe->pipe_buffer.size;
326	return (0);
327}
328
329/*
330 * initialize and allocate VM and memory for pipe
331 */
332static int
333pipe_create(cpipep)
334	struct pipe **cpipep;
335{
336	struct pipe *cpipe;
337	int error;
338
339	*cpipep = uma_zalloc(pipe_zone, M_WAITOK);
340	if (*cpipep == NULL)
341		return (ENOMEM);
342
343	cpipe = *cpipep;
344
345	/* so pipespace()->pipe_free_kmem() doesn't follow junk pointer */
346	cpipe->pipe_buffer.object = NULL;
347#ifndef PIPE_NODIRECT
348	cpipe->pipe_map.kva = NULL;
349#endif
350	/*
351	 * protect so pipeclose() doesn't follow a junk pointer
352	 * if pipespace() fails.
353	 */
354	bzero(&cpipe->pipe_sel, sizeof(cpipe->pipe_sel));
355	cpipe->pipe_state = 0;
356	cpipe->pipe_peer = NULL;
357	cpipe->pipe_busy = 0;
358
359#ifndef PIPE_NODIRECT
360	/*
361	 * pipe data structure initializations to support direct pipe I/O
362	 */
363	cpipe->pipe_map.cnt = 0;
364	cpipe->pipe_map.kva = 0;
365	cpipe->pipe_map.pos = 0;
366	cpipe->pipe_map.npages = 0;
367	/* cpipe->pipe_map.ms[] = invalid */
368#endif
369
370	cpipe->pipe_mtxp = NULL;	/* avoid pipespace assertion */
371	error = pipespace(cpipe, PIPE_SIZE);
372	if (error)
373		return (error);
374
375	vfs_timestamp(&cpipe->pipe_ctime);
376	cpipe->pipe_atime = cpipe->pipe_ctime;
377	cpipe->pipe_mtime = cpipe->pipe_ctime;
378
379	return (0);
380}
381
382
383/*
384 * lock a pipe for I/O, blocking other access
385 */
386static __inline int
387pipelock(cpipe, catch)
388	struct pipe *cpipe;
389	int catch;
390{
391	int error;
392
393	PIPE_LOCK_ASSERT(cpipe, MA_OWNED);
394	while (cpipe->pipe_state & PIPE_LOCKFL) {
395		cpipe->pipe_state |= PIPE_LWANT;
396		error = msleep(cpipe, PIPE_MTX(cpipe),
397		    catch ? (PRIBIO | PCATCH) : PRIBIO,
398		    "pipelk", 0);
399		if (error != 0)
400			return (error);
401	}
402	cpipe->pipe_state |= PIPE_LOCKFL;
403	return (0);
404}
405
406/*
407 * unlock a pipe I/O lock
408 */
409static __inline void
410pipeunlock(cpipe)
411	struct pipe *cpipe;
412{
413
414	PIPE_LOCK_ASSERT(cpipe, MA_OWNED);
415	cpipe->pipe_state &= ~PIPE_LOCKFL;
416	if (cpipe->pipe_state & PIPE_LWANT) {
417		cpipe->pipe_state &= ~PIPE_LWANT;
418		wakeup(cpipe);
419	}
420}
421
422static __inline void
423pipeselwakeup(cpipe)
424	struct pipe *cpipe;
425{
426
427	if (cpipe->pipe_state & PIPE_SEL) {
428		cpipe->pipe_state &= ~PIPE_SEL;
429		selwakeup(&cpipe->pipe_sel);
430	}
431	if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio)
432		pgsigio(cpipe->pipe_sigio, SIGIO, 0);
433	KNOTE(&cpipe->pipe_sel.si_note, 0);
434}
435
436/* ARGSUSED */
437static int
438pipe_read(fp, uio, cred, flags, td)
439	struct file *fp;
440	struct uio *uio;
441	struct ucred *cred;
442	struct thread *td;
443	int flags;
444{
445	struct pipe *rpipe = (struct pipe *) fp->f_data;
446	int error;
447	int nread = 0;
448	u_int size;
449
450	PIPE_LOCK(rpipe);
451	++rpipe->pipe_busy;
452	error = pipelock(rpipe, 1);
453	if (error)
454		goto unlocked_error;
455
456	while (uio->uio_resid) {
457		/*
458		 * normal pipe buffer receive
459		 */
460		if (rpipe->pipe_buffer.cnt > 0) {
461			size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
462			if (size > rpipe->pipe_buffer.cnt)
463				size = rpipe->pipe_buffer.cnt;
464			if (size > (u_int) uio->uio_resid)
465				size = (u_int) uio->uio_resid;
466
467			PIPE_UNLOCK(rpipe);
468			error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
469					size, uio);
470			PIPE_LOCK(rpipe);
471			if (error)
472				break;
473
474			rpipe->pipe_buffer.out += size;
475			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
476				rpipe->pipe_buffer.out = 0;
477
478			rpipe->pipe_buffer.cnt -= size;
479
480			/*
481			 * If there is no more to read in the pipe, reset
482			 * its pointers to the beginning.  This improves
483			 * cache hit stats.
484			 */
485			if (rpipe->pipe_buffer.cnt == 0) {
486				rpipe->pipe_buffer.in = 0;
487				rpipe->pipe_buffer.out = 0;
488			}
489			nread += size;
490#ifndef PIPE_NODIRECT
491		/*
492		 * Direct copy, bypassing a kernel buffer.
493		 */
494		} else if ((size = rpipe->pipe_map.cnt) &&
495			   (rpipe->pipe_state & PIPE_DIRECTW)) {
496			caddr_t	va;
497			if (size > (u_int) uio->uio_resid)
498				size = (u_int) uio->uio_resid;
499
500			va = (caddr_t) rpipe->pipe_map.kva +
501			    rpipe->pipe_map.pos;
502			PIPE_UNLOCK(rpipe);
503			error = uiomove(va, size, uio);
504			PIPE_LOCK(rpipe);
505			if (error)
506				break;
507			nread += size;
508			rpipe->pipe_map.pos += size;
509			rpipe->pipe_map.cnt -= size;
510			if (rpipe->pipe_map.cnt == 0) {
511				rpipe->pipe_state &= ~PIPE_DIRECTW;
512				wakeup(rpipe);
513			}
514#endif
515		} else {
516			/*
517			 * detect EOF condition
518			 * read returns 0 on EOF, no need to set error
519			 */
520			if (rpipe->pipe_state & PIPE_EOF)
521				break;
522
523			/*
524			 * If the "write-side" has been blocked, wake it up now.
525			 */
526			if (rpipe->pipe_state & PIPE_WANTW) {
527				rpipe->pipe_state &= ~PIPE_WANTW;
528				wakeup(rpipe);
529			}
530
531			/*
532			 * Break if some data was read.
533			 */
534			if (nread > 0)
535				break;
536
537			/*
538			 * Unlock the pipe buffer for our remaining processing.  We
539			 * will either break out with an error or we will sleep and
540			 * relock to loop.
541			 */
542			pipeunlock(rpipe);
543
544			/*
545			 * Handle non-blocking mode operation or
546			 * wait for more data.
547			 */
548			if (fp->f_flag & FNONBLOCK) {
549				error = EAGAIN;
550			} else {
551				rpipe->pipe_state |= PIPE_WANTR;
552				if ((error = msleep(rpipe, PIPE_MTX(rpipe),
553				    PRIBIO | PCATCH,
554				    "piperd", 0)) == 0)
555					error = pipelock(rpipe, 1);
556			}
557			if (error)
558				goto unlocked_error;
559		}
560	}
561	pipeunlock(rpipe);
562
563	/* XXX: should probably do this before getting any locks. */
564	if (error == 0)
565		vfs_timestamp(&rpipe->pipe_atime);
566unlocked_error:
567	--rpipe->pipe_busy;
568
569	/*
570	 * PIPE_WANT processing only makes sense if pipe_busy is 0.
571	 */
572	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
573		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
574		wakeup(rpipe);
575	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
576		/*
577		 * Handle write blocking hysteresis.
578		 */
579		if (rpipe->pipe_state & PIPE_WANTW) {
580			rpipe->pipe_state &= ~PIPE_WANTW;
581			wakeup(rpipe);
582		}
583	}
584
585	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
586		pipeselwakeup(rpipe);
587
588	PIPE_UNLOCK(rpipe);
589	return (error);
590}
591
592#ifndef PIPE_NODIRECT
593/*
594 * Map the sending processes' buffer into kernel space and wire it.
595 * This is similar to a physical write operation.
596 */
597static int
598pipe_build_write_buffer(wpipe, uio)
599	struct pipe *wpipe;
600	struct uio *uio;
601{
602	vm_map_t map;
603	vm_map_entry_t me;
604	vm_object_t obj;
605	vm_pindex_t pidx;
606	vm_prot_t prot;
607	vm_page_t m;
608	boolean_t wired;
609	u_int size;
610	int i, rv;
611	vm_offset_t addr, endaddr;
612
613	GIANT_REQUIRED;
614	PIPE_LOCK_ASSERT(wpipe, MA_NOTOWNED);
615
616	size = (u_int) uio->uio_iov->iov_len;
617	if (size > wpipe->pipe_buffer.size)
618		size = wpipe->pipe_buffer.size;
619
620	endaddr = round_page((vm_offset_t)uio->uio_iov->iov_base + size);
621	addr = trunc_page((vm_offset_t)uio->uio_iov->iov_base);
622	for (i = 0; addr < endaddr; addr += PAGE_SIZE, i++) {
623		map = &curproc->p_vmspace->vm_map;
624		rv = KERN_FAILURE;
625		if (vm_fault_quick((caddr_t)addr, VM_PROT_READ) < 0 ||
626		    (rv = vm_map_lookup(&map, addr, VM_PROT_READ, &me, &obj,
627		     &pidx, &prot, &wired)) != KERN_SUCCESS ||
628		    (m = vm_page_lookup(obj, pidx)) == NULL) {
629			int j;
630
631			if (rv == KERN_SUCCESS)
632				vm_map_lookup_done(map, me);
633			for (j = 0; j < i; j++)
634				vm_page_unwire(wpipe->pipe_map.ms[j], 1);
635			return (EFAULT);
636		}
637
638		vm_page_wire(m);
639		vm_map_lookup_done(map, me);
640		wpipe->pipe_map.ms[i] = m;
641	}
642
643/*
644 * set up the control block
645 */
646	wpipe->pipe_map.npages = i;
647	wpipe->pipe_map.pos =
648	    ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
649	wpipe->pipe_map.cnt = size;
650
651/*
652 * and map the buffer
653 */
654	if (wpipe->pipe_map.kva == 0) {
655		/*
656		 * We need to allocate space for an extra page because the
657		 * address range might (will) span pages at times.
658		 */
659		wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
660			wpipe->pipe_buffer.size + PAGE_SIZE);
661		amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE;
662	}
663	pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
664		wpipe->pipe_map.npages);
665
666/*
667 * and update the uio data
668 */
669
670	uio->uio_iov->iov_len -= size;
671	uio->uio_iov->iov_base += size;
672	if (uio->uio_iov->iov_len == 0)
673		uio->uio_iov++;
674	uio->uio_resid -= size;
675	uio->uio_offset += size;
676	return (0);
677}
678
679/*
680 * unmap and unwire the process buffer
681 */
682static void
683pipe_destroy_write_buffer(wpipe)
684	struct pipe *wpipe;
685{
686	int i;
687
688	GIANT_REQUIRED;
689	PIPE_LOCK_ASSERT(wpipe, MA_NOTOWNED);
690
691	if (wpipe->pipe_map.kva) {
692		pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
693
694		if (amountpipekva > MAXPIPEKVA) {
695			vm_offset_t kva = wpipe->pipe_map.kva;
696			wpipe->pipe_map.kva = 0;
697			kmem_free(kernel_map, kva,
698				wpipe->pipe_buffer.size + PAGE_SIZE);
699			amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
700		}
701	}
702	for (i = 0; i < wpipe->pipe_map.npages; i++)
703		vm_page_unwire(wpipe->pipe_map.ms[i], 1);
704	wpipe->pipe_map.npages = 0;
705}
706
707/*
708 * In the case of a signal, the writing process might go away.  This
709 * code copies the data into the circular buffer so that the source
710 * pages can be freed without loss of data.
711 */
712static void
713pipe_clone_write_buffer(wpipe)
714	struct pipe *wpipe;
715{
716	int size;
717	int pos;
718
719	PIPE_LOCK_ASSERT(wpipe, MA_OWNED);
720	size = wpipe->pipe_map.cnt;
721	pos = wpipe->pipe_map.pos;
722
723	wpipe->pipe_buffer.in = size;
724	wpipe->pipe_buffer.out = 0;
725	wpipe->pipe_buffer.cnt = size;
726	wpipe->pipe_state &= ~PIPE_DIRECTW;
727
728	PIPE_GET_GIANT(wpipe);
729	bcopy((caddr_t) wpipe->pipe_map.kva + pos,
730	    (caddr_t) wpipe->pipe_buffer.buffer, size);
731	pipe_destroy_write_buffer(wpipe);
732	PIPE_DROP_GIANT(wpipe);
733}
734
735/*
736 * This implements the pipe buffer write mechanism.  Note that only
737 * a direct write OR a normal pipe write can be pending at any given time.
738 * If there are any characters in the pipe buffer, the direct write will
739 * be deferred until the receiving process grabs all of the bytes from
740 * the pipe buffer.  Then the direct mapping write is set-up.
741 */
742static int
743pipe_direct_write(wpipe, uio)
744	struct pipe *wpipe;
745	struct uio *uio;
746{
747	int error;
748
749retry:
750	PIPE_LOCK_ASSERT(wpipe, MA_OWNED);
751	while (wpipe->pipe_state & PIPE_DIRECTW) {
752		if (wpipe->pipe_state & PIPE_WANTR) {
753			wpipe->pipe_state &= ~PIPE_WANTR;
754			wakeup(wpipe);
755		}
756		wpipe->pipe_state |= PIPE_WANTW;
757		error = msleep(wpipe, PIPE_MTX(wpipe),
758		    PRIBIO | PCATCH, "pipdww", 0);
759		if (error)
760			goto error1;
761		if (wpipe->pipe_state & PIPE_EOF) {
762			error = EPIPE;
763			goto error1;
764		}
765	}
766	wpipe->pipe_map.cnt = 0;	/* transfer not ready yet */
767	if (wpipe->pipe_buffer.cnt > 0) {
768		if (wpipe->pipe_state & PIPE_WANTR) {
769			wpipe->pipe_state &= ~PIPE_WANTR;
770			wakeup(wpipe);
771		}
772
773		wpipe->pipe_state |= PIPE_WANTW;
774		error = msleep(wpipe, PIPE_MTX(wpipe),
775		    PRIBIO | PCATCH, "pipdwc", 0);
776		if (error)
777			goto error1;
778		if (wpipe->pipe_state & PIPE_EOF) {
779			error = EPIPE;
780			goto error1;
781		}
782		goto retry;
783	}
784
785	wpipe->pipe_state |= PIPE_DIRECTW;
786
787	pipelock(wpipe, 0);
788	PIPE_GET_GIANT(wpipe);
789	error = pipe_build_write_buffer(wpipe, uio);
790	PIPE_DROP_GIANT(wpipe);
791	pipeunlock(wpipe);
792	if (error) {
793		wpipe->pipe_state &= ~PIPE_DIRECTW;
794		goto error1;
795	}
796
797	error = 0;
798	while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
799		if (wpipe->pipe_state & PIPE_EOF) {
800			pipelock(wpipe, 0);
801			PIPE_GET_GIANT(wpipe);
802			pipe_destroy_write_buffer(wpipe);
803			PIPE_DROP_GIANT(wpipe);
804			pipeunlock(wpipe);
805			pipeselwakeup(wpipe);
806			error = EPIPE;
807			goto error1;
808		}
809		if (wpipe->pipe_state & PIPE_WANTR) {
810			wpipe->pipe_state &= ~PIPE_WANTR;
811			wakeup(wpipe);
812		}
813		pipeselwakeup(wpipe);
814		error = msleep(wpipe, PIPE_MTX(wpipe), PRIBIO | PCATCH,
815		    "pipdwt", 0);
816	}
817
818	pipelock(wpipe,0);
819	if (wpipe->pipe_state & PIPE_DIRECTW) {
820		/*
821		 * this bit of trickery substitutes a kernel buffer for
822		 * the process that might be going away.
823		 */
824		pipe_clone_write_buffer(wpipe);
825	} else {
826		PIPE_GET_GIANT(wpipe);
827		pipe_destroy_write_buffer(wpipe);
828		PIPE_DROP_GIANT(wpipe);
829	}
830	pipeunlock(wpipe);
831	return (error);
832
833error1:
834	wakeup(wpipe);
835	return (error);
836}
837#endif
838
839static int
840pipe_write(fp, uio, cred, flags, td)
841	struct file *fp;
842	struct uio *uio;
843	struct ucred *cred;
844	struct thread *td;
845	int flags;
846{
847	int error = 0;
848	int orig_resid;
849	struct pipe *wpipe, *rpipe;
850
851	rpipe = (struct pipe *) fp->f_data;
852	wpipe = rpipe->pipe_peer;
853
854	PIPE_LOCK(rpipe);
855	/*
856	 * detect loss of pipe read side, issue SIGPIPE if lost.
857	 */
858	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
859		PIPE_UNLOCK(rpipe);
860		return (EPIPE);
861	}
862	++wpipe->pipe_busy;
863
864	/*
865	 * If it is advantageous to resize the pipe buffer, do
866	 * so.
867	 */
868	if ((uio->uio_resid > PIPE_SIZE) &&
869		(nbigpipe < LIMITBIGPIPES) &&
870		(wpipe->pipe_state & PIPE_DIRECTW) == 0 &&
871		(wpipe->pipe_buffer.size <= PIPE_SIZE) &&
872		(wpipe->pipe_buffer.cnt == 0)) {
873
874		if ((error = pipelock(wpipe,1)) == 0) {
875			PIPE_GET_GIANT(wpipe);
876			if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
877				nbigpipe++;
878			PIPE_DROP_GIANT(wpipe);
879			pipeunlock(wpipe);
880		}
881	}
882
883	/*
884	 * If an early error occured unbusy and return, waking up any pending
885	 * readers.
886	 */
887	if (error) {
888		--wpipe->pipe_busy;
889		if ((wpipe->pipe_busy == 0) &&
890		    (wpipe->pipe_state & PIPE_WANT)) {
891			wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
892			wakeup(wpipe);
893		}
894		PIPE_UNLOCK(rpipe);
895		return(error);
896	}
897
898	KASSERT(wpipe->pipe_buffer.buffer != NULL, ("pipe buffer gone"));
899
900	orig_resid = uio->uio_resid;
901
902	while (uio->uio_resid) {
903		int space;
904
905#ifndef PIPE_NODIRECT
906		/*
907		 * If the transfer is large, we can gain performance if
908		 * we do process-to-process copies directly.
909		 * If the write is non-blocking, we don't use the
910		 * direct write mechanism.
911		 *
912		 * The direct write mechanism will detect the reader going
913		 * away on us.
914		 */
915		if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) &&
916		    (fp->f_flag & FNONBLOCK) == 0 &&
917			(wpipe->pipe_map.kva || (amountpipekva < LIMITPIPEKVA)) &&
918			(uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
919			error = pipe_direct_write( wpipe, uio);
920			if (error)
921				break;
922			continue;
923		}
924#endif
925
926		/*
927		 * Pipe buffered writes cannot be coincidental with
928		 * direct writes.  We wait until the currently executing
929		 * direct write is completed before we start filling the
930		 * pipe buffer.  We break out if a signal occurs or the
931		 * reader goes away.
932		 */
933	retrywrite:
934		while (wpipe->pipe_state & PIPE_DIRECTW) {
935			if (wpipe->pipe_state & PIPE_WANTR) {
936				wpipe->pipe_state &= ~PIPE_WANTR;
937				wakeup(wpipe);
938			}
939			error = msleep(wpipe, PIPE_MTX(rpipe), PRIBIO | PCATCH,
940			    "pipbww", 0);
941			if (wpipe->pipe_state & PIPE_EOF)
942				break;
943			if (error)
944				break;
945		}
946		if (wpipe->pipe_state & PIPE_EOF) {
947			error = EPIPE;
948			break;
949		}
950
951		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
952
953		/* Writes of size <= PIPE_BUF must be atomic. */
954		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
955			space = 0;
956
957		if (space > 0 && (wpipe->pipe_buffer.cnt < PIPE_SIZE)) {
958			if ((error = pipelock(wpipe,1)) == 0) {
959				int size;	/* Transfer size */
960				int segsize;	/* first segment to transfer */
961
962				/*
963				 * It is possible for a direct write to
964				 * slip in on us... handle it here...
965				 */
966				if (wpipe->pipe_state & PIPE_DIRECTW) {
967					pipeunlock(wpipe);
968					goto retrywrite;
969				}
970				/*
971				 * If a process blocked in uiomove, our
972				 * value for space might be bad.
973				 *
974				 * XXX will we be ok if the reader has gone
975				 * away here?
976				 */
977				if (space > wpipe->pipe_buffer.size -
978				    wpipe->pipe_buffer.cnt) {
979					pipeunlock(wpipe);
980					goto retrywrite;
981				}
982
983				/*
984				 * Transfer size is minimum of uio transfer
985				 * and free space in pipe buffer.
986				 */
987				if (space > uio->uio_resid)
988					size = uio->uio_resid;
989				else
990					size = space;
991				/*
992				 * First segment to transfer is minimum of
993				 * transfer size and contiguous space in
994				 * pipe buffer.  If first segment to transfer
995				 * is less than the transfer size, we've got
996				 * a wraparound in the buffer.
997				 */
998				segsize = wpipe->pipe_buffer.size -
999					wpipe->pipe_buffer.in;
1000				if (segsize > size)
1001					segsize = size;
1002
1003				/* Transfer first segment */
1004
1005				PIPE_UNLOCK(rpipe);
1006				error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
1007						segsize, uio);
1008				PIPE_LOCK(rpipe);
1009
1010				if (error == 0 && segsize < size) {
1011					/*
1012					 * Transfer remaining part now, to
1013					 * support atomic writes.  Wraparound
1014					 * happened.
1015					 */
1016					if (wpipe->pipe_buffer.in + segsize !=
1017					    wpipe->pipe_buffer.size)
1018						panic("Expected pipe buffer wraparound disappeared");
1019
1020					PIPE_UNLOCK(rpipe);
1021					error = uiomove(&wpipe->pipe_buffer.buffer[0],
1022							size - segsize, uio);
1023					PIPE_LOCK(rpipe);
1024				}
1025				if (error == 0) {
1026					wpipe->pipe_buffer.in += size;
1027					if (wpipe->pipe_buffer.in >=
1028					    wpipe->pipe_buffer.size) {
1029						if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size)
1030							panic("Expected wraparound bad");
1031						wpipe->pipe_buffer.in = size - segsize;
1032					}
1033
1034					wpipe->pipe_buffer.cnt += size;
1035					if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size)
1036						panic("Pipe buffer overflow");
1037
1038				}
1039				pipeunlock(wpipe);
1040			}
1041			if (error)
1042				break;
1043
1044		} else {
1045			/*
1046			 * If the "read-side" has been blocked, wake it up now.
1047			 */
1048			if (wpipe->pipe_state & PIPE_WANTR) {
1049				wpipe->pipe_state &= ~PIPE_WANTR;
1050				wakeup(wpipe);
1051			}
1052
1053			/*
1054			 * don't block on non-blocking I/O
1055			 */
1056			if (fp->f_flag & FNONBLOCK) {
1057				error = EAGAIN;
1058				break;
1059			}
1060
1061			/*
1062			 * We have no more space and have something to offer,
1063			 * wake up select/poll.
1064			 */
1065			pipeselwakeup(wpipe);
1066
1067			wpipe->pipe_state |= PIPE_WANTW;
1068			error = msleep(wpipe, PIPE_MTX(rpipe),
1069			    PRIBIO | PCATCH, "pipewr", 0);
1070			if (error != 0)
1071				break;
1072			/*
1073			 * If read side wants to go away, we just issue a signal
1074			 * to ourselves.
1075			 */
1076			if (wpipe->pipe_state & PIPE_EOF) {
1077				error = EPIPE;
1078				break;
1079			}
1080		}
1081	}
1082
1083	--wpipe->pipe_busy;
1084
1085	if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) {
1086		wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
1087		wakeup(wpipe);
1088	} else if (wpipe->pipe_buffer.cnt > 0) {
1089		/*
1090		 * If we have put any characters in the buffer, we wake up
1091		 * the reader.
1092		 */
1093		if (wpipe->pipe_state & PIPE_WANTR) {
1094			wpipe->pipe_state &= ~PIPE_WANTR;
1095			wakeup(wpipe);
1096		}
1097	}
1098
1099	/*
1100	 * Don't return EPIPE if I/O was successful
1101	 */
1102	if ((wpipe->pipe_buffer.cnt == 0) &&
1103	    (uio->uio_resid == 0) &&
1104	    (error == EPIPE)) {
1105		error = 0;
1106	}
1107
1108	if (error == 0)
1109		vfs_timestamp(&wpipe->pipe_mtime);
1110
1111	/*
1112	 * We have something to offer,
1113	 * wake up select/poll.
1114	 */
1115	if (wpipe->pipe_buffer.cnt)
1116		pipeselwakeup(wpipe);
1117
1118	PIPE_UNLOCK(rpipe);
1119	return (error);
1120}
1121
1122/*
1123 * we implement a very minimal set of ioctls for compatibility with sockets.
1124 */
1125int
1126pipe_ioctl(fp, cmd, data, td)
1127	struct file *fp;
1128	u_long cmd;
1129	caddr_t data;
1130	struct thread *td;
1131{
1132	struct pipe *mpipe = (struct pipe *)fp->f_data;
1133
1134	switch (cmd) {
1135
1136	case FIONBIO:
1137		return (0);
1138
1139	case FIOASYNC:
1140		PIPE_LOCK(mpipe);
1141		if (*(int *)data) {
1142			mpipe->pipe_state |= PIPE_ASYNC;
1143		} else {
1144			mpipe->pipe_state &= ~PIPE_ASYNC;
1145		}
1146		PIPE_UNLOCK(mpipe);
1147		return (0);
1148
1149	case FIONREAD:
1150		PIPE_LOCK(mpipe);
1151		if (mpipe->pipe_state & PIPE_DIRECTW)
1152			*(int *)data = mpipe->pipe_map.cnt;
1153		else
1154			*(int *)data = mpipe->pipe_buffer.cnt;
1155		PIPE_UNLOCK(mpipe);
1156		return (0);
1157
1158	case FIOSETOWN:
1159		return (fsetown(*(int *)data, &mpipe->pipe_sigio));
1160
1161	case FIOGETOWN:
1162		*(int *)data = fgetown(mpipe->pipe_sigio);
1163		return (0);
1164
1165	/* This is deprecated, FIOSETOWN should be used instead. */
1166	case TIOCSPGRP:
1167		return (fsetown(-(*(int *)data), &mpipe->pipe_sigio));
1168
1169	/* This is deprecated, FIOGETOWN should be used instead. */
1170	case TIOCGPGRP:
1171		*(int *)data = -fgetown(mpipe->pipe_sigio);
1172		return (0);
1173
1174	}
1175	return (ENOTTY);
1176}
1177
1178int
1179pipe_poll(fp, events, cred, td)
1180	struct file *fp;
1181	int events;
1182	struct ucred *cred;
1183	struct thread *td;
1184{
1185	struct pipe *rpipe = (struct pipe *)fp->f_data;
1186	struct pipe *wpipe;
1187	int revents = 0;
1188
1189	wpipe = rpipe->pipe_peer;
1190	PIPE_LOCK(rpipe);
1191	if (events & (POLLIN | POLLRDNORM))
1192		if ((rpipe->pipe_state & PIPE_DIRECTW) ||
1193		    (rpipe->pipe_buffer.cnt > 0) ||
1194		    (rpipe->pipe_state & PIPE_EOF))
1195			revents |= events & (POLLIN | POLLRDNORM);
1196
1197	if (events & (POLLOUT | POLLWRNORM))
1198		if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF) ||
1199		    (((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
1200		     (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF))
1201			revents |= events & (POLLOUT | POLLWRNORM);
1202
1203	if ((rpipe->pipe_state & PIPE_EOF) ||
1204	    (wpipe == NULL) ||
1205	    (wpipe->pipe_state & PIPE_EOF))
1206		revents |= POLLHUP;
1207
1208	if (revents == 0) {
1209		if (events & (POLLIN | POLLRDNORM)) {
1210			selrecord(td, &rpipe->pipe_sel);
1211			rpipe->pipe_state |= PIPE_SEL;
1212		}
1213
1214		if (events & (POLLOUT | POLLWRNORM)) {
1215			selrecord(td, &wpipe->pipe_sel);
1216			wpipe->pipe_state |= PIPE_SEL;
1217		}
1218	}
1219	PIPE_UNLOCK(rpipe);
1220
1221	return (revents);
1222}
1223
1224static int
1225pipe_stat(fp, ub, td)
1226	struct file *fp;
1227	struct stat *ub;
1228	struct thread *td;
1229{
1230	struct pipe *pipe = (struct pipe *)fp->f_data;
1231
1232	bzero((caddr_t)ub, sizeof(*ub));
1233	ub->st_mode = S_IFIFO;
1234	ub->st_blksize = pipe->pipe_buffer.size;
1235	ub->st_size = pipe->pipe_buffer.cnt;
1236	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1237	ub->st_atimespec = pipe->pipe_atime;
1238	ub->st_mtimespec = pipe->pipe_mtime;
1239	ub->st_ctimespec = pipe->pipe_ctime;
1240	ub->st_uid = fp->f_cred->cr_uid;
1241	ub->st_gid = fp->f_cred->cr_gid;
1242	/*
1243	 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen.
1244	 * XXX (st_dev, st_ino) should be unique.
1245	 */
1246	return (0);
1247}
1248
1249/* ARGSUSED */
1250static int
1251pipe_close(fp, td)
1252	struct file *fp;
1253	struct thread *td;
1254{
1255	struct pipe *cpipe = (struct pipe *)fp->f_data;
1256
1257	fp->f_ops = &badfileops;
1258	fp->f_data = NULL;
1259	funsetown(cpipe->pipe_sigio);
1260	pipeclose(cpipe);
1261	return (0);
1262}
1263
1264static void
1265pipe_free_kmem(cpipe)
1266	struct pipe *cpipe;
1267{
1268
1269	GIANT_REQUIRED;
1270	KASSERT(cpipe->pipe_mtxp == NULL || !mtx_owned(PIPE_MTX(cpipe)),
1271	       ("pipespace: pipe mutex locked"));
1272
1273	if (cpipe->pipe_buffer.buffer != NULL) {
1274		if (cpipe->pipe_buffer.size > PIPE_SIZE)
1275			--nbigpipe;
1276		amountpipekva -= cpipe->pipe_buffer.size;
1277		kmem_free(kernel_map,
1278			(vm_offset_t)cpipe->pipe_buffer.buffer,
1279			cpipe->pipe_buffer.size);
1280		cpipe->pipe_buffer.buffer = NULL;
1281	}
1282#ifndef PIPE_NODIRECT
1283	if (cpipe->pipe_map.kva != NULL) {
1284		amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE;
1285		kmem_free(kernel_map,
1286			cpipe->pipe_map.kva,
1287			cpipe->pipe_buffer.size + PAGE_SIZE);
1288		cpipe->pipe_map.cnt = 0;
1289		cpipe->pipe_map.kva = 0;
1290		cpipe->pipe_map.pos = 0;
1291		cpipe->pipe_map.npages = 0;
1292	}
1293#endif
1294}
1295
1296/*
1297 * shutdown the pipe
1298 */
1299static void
1300pipeclose(cpipe)
1301	struct pipe *cpipe;
1302{
1303	struct pipe *ppipe;
1304	int hadpeer;
1305
1306	if (cpipe == NULL)
1307		return;
1308
1309	hadpeer = 0;
1310
1311	/* partially created pipes won't have a valid mutex. */
1312	if (PIPE_MTX(cpipe) != NULL)
1313		PIPE_LOCK(cpipe);
1314
1315	pipeselwakeup(cpipe);
1316
1317	/*
1318	 * If the other side is blocked, wake it up saying that
1319	 * we want to close it down.
1320	 */
1321	while (cpipe->pipe_busy) {
1322		wakeup(cpipe);
1323		cpipe->pipe_state |= PIPE_WANT | PIPE_EOF;
1324		msleep(cpipe, PIPE_MTX(cpipe), PRIBIO, "pipecl", 0);
1325	}
1326
1327	/*
1328	 * Disconnect from peer
1329	 */
1330	if ((ppipe = cpipe->pipe_peer) != NULL) {
1331		hadpeer++;
1332		pipeselwakeup(ppipe);
1333
1334		ppipe->pipe_state |= PIPE_EOF;
1335		wakeup(ppipe);
1336		KNOTE(&ppipe->pipe_sel.si_note, 0);
1337		ppipe->pipe_peer = NULL;
1338	}
1339	/*
1340	 * free resources
1341	 */
1342	if (PIPE_MTX(cpipe) != NULL) {
1343		PIPE_UNLOCK(cpipe);
1344		if (!hadpeer) {
1345			mtx_destroy(PIPE_MTX(cpipe));
1346			free(PIPE_MTX(cpipe), M_TEMP);
1347		}
1348	}
1349	mtx_lock(&Giant);
1350	pipe_free_kmem(cpipe);
1351	uma_zfree(pipe_zone, cpipe);
1352	mtx_unlock(&Giant);
1353}
1354
1355/*ARGSUSED*/
1356static int
1357pipe_kqfilter(struct file *fp, struct knote *kn)
1358{
1359	struct pipe *cpipe;
1360
1361	cpipe = (struct pipe *)kn->kn_fp->f_data;
1362	switch (kn->kn_filter) {
1363	case EVFILT_READ:
1364		kn->kn_fop = &pipe_rfiltops;
1365		break;
1366	case EVFILT_WRITE:
1367		kn->kn_fop = &pipe_wfiltops;
1368		cpipe = cpipe->pipe_peer;
1369		break;
1370	default:
1371		return (1);
1372	}
1373	kn->kn_hook = (caddr_t)cpipe;
1374
1375	PIPE_LOCK(cpipe);
1376	SLIST_INSERT_HEAD(&cpipe->pipe_sel.si_note, kn, kn_selnext);
1377	PIPE_UNLOCK(cpipe);
1378	return (0);
1379}
1380
1381static void
1382filt_pipedetach(struct knote *kn)
1383{
1384	struct pipe *cpipe = (struct pipe *)kn->kn_hook;
1385
1386	PIPE_LOCK(cpipe);
1387	SLIST_REMOVE(&cpipe->pipe_sel.si_note, kn, knote, kn_selnext);
1388	PIPE_UNLOCK(cpipe);
1389}
1390
1391/*ARGSUSED*/
1392static int
1393filt_piperead(struct knote *kn, long hint)
1394{
1395	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1396	struct pipe *wpipe = rpipe->pipe_peer;
1397
1398	PIPE_LOCK(rpipe);
1399	kn->kn_data = rpipe->pipe_buffer.cnt;
1400	if ((kn->kn_data == 0) && (rpipe->pipe_state & PIPE_DIRECTW))
1401		kn->kn_data = rpipe->pipe_map.cnt;
1402
1403	if ((rpipe->pipe_state & PIPE_EOF) ||
1404	    (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1405		kn->kn_flags |= EV_EOF;
1406		PIPE_UNLOCK(rpipe);
1407		return (1);
1408	}
1409	PIPE_UNLOCK(rpipe);
1410	return (kn->kn_data > 0);
1411}
1412
1413/*ARGSUSED*/
1414static int
1415filt_pipewrite(struct knote *kn, long hint)
1416{
1417	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1418	struct pipe *wpipe = rpipe->pipe_peer;
1419
1420	PIPE_LOCK(rpipe);
1421	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1422		kn->kn_data = 0;
1423		kn->kn_flags |= EV_EOF;
1424		PIPE_UNLOCK(rpipe);
1425		return (1);
1426	}
1427	kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
1428	if (wpipe->pipe_state & PIPE_DIRECTW)
1429		kn->kn_data = 0;
1430
1431	PIPE_UNLOCK(rpipe);
1432	return (kn->kn_data >= PIPE_BUF);
1433}
1434