1/*	$NetBSD: sys_pipe.c,v 1.167 2024/02/10 09:21:54 andvar Exp $	*/
2
3/*-
4 * Copyright (c) 2003, 2007, 2008, 2009, 2023 The NetBSD Foundation, Inc.
5 * All rights reserved.
6 *
7 * This code is derived from software contributed to The NetBSD Foundation
8 * by Paul Kranenburg, and by Andrew Doran.
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
13 * 1. Redistributions of source code must retain the above copyright
14 *    notice, this list of conditions and the following disclaimer.
15 * 2. Redistributions in binary form must reproduce the above copyright
16 *    notice, this list of conditions and the following disclaimer in the
17 *    documentation and/or other materials provided with the distribution.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
20 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
23 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32/*
33 * Copyright (c) 1996 John S. Dyson
34 * All rights reserved.
35 *
36 * Redistribution and use in source and binary forms, with or without
37 * modification, are permitted provided that the following conditions
38 * are met:
39 * 1. Redistributions of source code must retain the above copyright
40 *    notice immediately at the beginning of the file, without modification,
41 *    this list of conditions, and the following disclaimer.
42 * 2. Redistributions in binary form must reproduce the above copyright
43 *    notice, this list of conditions and the following disclaimer in the
44 *    documentation and/or other materials provided with the distribution.
45 * 3. Absolutely no warranty of function or purpose is made by the author
46 *    John S. Dyson.
47 * 4. Modifications may be freely made to this file if the above conditions
48 *    are met.
49 */
50
51/*
52 * This file contains a high-performance replacement for the socket-based
53 * pipes scheme originally used.  It does not support all features of
54 * sockets, but does do everything that pipes normally do.
55 */
56
57#include <sys/cdefs.h>
58__KERNEL_RCSID(0, "$NetBSD: sys_pipe.c,v 1.167 2024/02/10 09:21:54 andvar Exp $");
59
60#include <sys/param.h>
61#include <sys/systm.h>
62#include <sys/proc.h>
63#include <sys/fcntl.h>
64#include <sys/file.h>
65#include <sys/filedesc.h>
66#include <sys/filio.h>
67#include <sys/kernel.h>
68#include <sys/ttycom.h>
69#include <sys/stat.h>
70#include <sys/poll.h>
71#include <sys/signalvar.h>
72#include <sys/vnode.h>
73#include <sys/uio.h>
74#include <sys/select.h>
75#include <sys/mount.h>
76#include <sys/syscallargs.h>
77#include <sys/sysctl.h>
78#include <sys/kauth.h>
79#include <sys/atomic.h>
80#include <sys/pipe.h>
81
82static int	pipe_read(file_t *, off_t *, struct uio *, kauth_cred_t, int);
83static int	pipe_write(file_t *, off_t *, struct uio *, kauth_cred_t, int);
84static int	pipe_close(file_t *);
85static int	pipe_poll(file_t *, int);
86static int	pipe_kqfilter(file_t *, struct knote *);
87static int	pipe_stat(file_t *, struct stat *);
88static int	pipe_ioctl(file_t *, u_long, void *);
89static void	pipe_restart(file_t *);
90static int	pipe_fpathconf(file_t *, int, register_t *);
91static int	pipe_posix_fadvise(file_t *, off_t, off_t, int);
92
93static const struct fileops pipeops = {
94	.fo_name = "pipe",
95	.fo_read = pipe_read,
96	.fo_write = pipe_write,
97	.fo_ioctl = pipe_ioctl,
98	.fo_fcntl = fnullop_fcntl,
99	.fo_poll = pipe_poll,
100	.fo_stat = pipe_stat,
101	.fo_close = pipe_close,
102	.fo_kqfilter = pipe_kqfilter,
103	.fo_restart = pipe_restart,
104	.fo_fpathconf = pipe_fpathconf,
105	.fo_posix_fadvise = pipe_posix_fadvise,
106};
107
108/*
109 * Default pipe buffer size(s), this can be kind-of large now because pipe
110 * space is pageable.  The pipe code will try to maintain locality of
111 * reference for performance reasons, so small amounts of outstanding I/O
112 * will not wipe the cache.
113 */
114#define	MINPIPESIZE	(PIPE_SIZE / 3)
115#define	MAXPIPESIZE	(2 * PIPE_SIZE / 3)
116
117/*
118 * Limit the number of "big" pipes
119 */
120#define	LIMITBIGPIPES	32
121static u_int	maxbigpipes __read_mostly = LIMITBIGPIPES;
122static u_int	nbigpipe = 0;
123
124/*
125 * Amount of KVA consumed by pipe buffers.
126 */
127static u_int	amountpipekva = 0;
128
129static void	pipeclose(struct pipe *);
130static void	pipe_free_kmem(struct pipe *);
131static int	pipe_create(struct pipe **, pool_cache_t, struct timespec *);
132static int	pipelock(struct pipe *, bool);
133static inline void pipeunlock(struct pipe *);
134static void	pipeselwakeup(struct pipe *, struct pipe *, int);
135static int	pipespace(struct pipe *, int);
136static int	pipe_ctor(void *, void *, int);
137static void	pipe_dtor(void *, void *);
138
139static pool_cache_t	pipe_wr_cache;
140static pool_cache_t	pipe_rd_cache;
141
142void
143pipe_init(void)
144{
145
146	/* Writer side is not automatically allocated KVA. */
147	pipe_wr_cache = pool_cache_init(sizeof(struct pipe), 0, 0, 0, "pipewr",
148	    NULL, IPL_NONE, pipe_ctor, pipe_dtor, NULL);
149	KASSERT(pipe_wr_cache != NULL);
150
151	/* Reader side gets preallocated KVA. */
152	pipe_rd_cache = pool_cache_init(sizeof(struct pipe), 0, 0, 0, "piperd",
153	    NULL, IPL_NONE, pipe_ctor, pipe_dtor, (void *)1);
154	KASSERT(pipe_rd_cache != NULL);
155}
156
157static int
158pipe_ctor(void *arg, void *obj, int flags)
159{
160	struct pipe *pipe;
161	vaddr_t va;
162
163	pipe = obj;
164
165	memset(pipe, 0, sizeof(struct pipe));
166	if (arg != NULL) {
167		/* Preallocate space. */
168		va = uvm_km_alloc(kernel_map, PIPE_SIZE, 0,
169		    UVM_KMF_PAGEABLE | UVM_KMF_WAITVA);
170		KASSERT(va != 0);
171		pipe->pipe_kmem = va;
172		atomic_add_int(&amountpipekva, PIPE_SIZE);
173	}
174	cv_init(&pipe->pipe_rcv, "pipe_rd");
175	cv_init(&pipe->pipe_wcv, "pipe_wr");
176	cv_init(&pipe->pipe_draincv, "pipe_drn");
177	cv_init(&pipe->pipe_lkcv, "pipe_lk");
178	selinit(&pipe->pipe_sel);
179	pipe->pipe_state = PIPE_SIGNALR;
180
181	return 0;
182}
183
184static void
185pipe_dtor(void *arg, void *obj)
186{
187	struct pipe *pipe;
188
189	pipe = obj;
190
191	cv_destroy(&pipe->pipe_rcv);
192	cv_destroy(&pipe->pipe_wcv);
193	cv_destroy(&pipe->pipe_draincv);
194	cv_destroy(&pipe->pipe_lkcv);
195	seldestroy(&pipe->pipe_sel);
196	if (pipe->pipe_kmem != 0) {
197		uvm_km_free(kernel_map, pipe->pipe_kmem, PIPE_SIZE,
198		    UVM_KMF_PAGEABLE);
199		atomic_add_int(&amountpipekva, -PIPE_SIZE);
200	}
201}
202
203/*
204 * The pipe system call for the DTYPE_PIPE type of pipes
205 */
206int
207pipe1(struct lwp *l, int *fildes, int flags)
208{
209	struct pipe *rpipe, *wpipe;
210	struct timespec nt;
211	file_t *rf, *wf;
212	int fd, error;
213	proc_t *p;
214
215	if (flags & ~(O_CLOEXEC|O_NONBLOCK|O_NOSIGPIPE))
216		return EINVAL;
217	p = curproc;
218	rpipe = wpipe = NULL;
219	getnanotime(&nt);
220	if ((error = pipe_create(&rpipe, pipe_rd_cache, &nt)) ||
221	    (error = pipe_create(&wpipe, pipe_wr_cache, &nt))) {
222		goto free2;
223	}
224	rpipe->pipe_lock = mutex_obj_alloc(MUTEX_DEFAULT, IPL_NONE);
225	wpipe->pipe_lock = rpipe->pipe_lock;
226	mutex_obj_hold(wpipe->pipe_lock);
227
228	error = fd_allocfile(&rf, &fd);
229	if (error)
230		goto free2;
231	fildes[0] = fd;
232
233	error = fd_allocfile(&wf, &fd);
234	if (error)
235		goto free3;
236	fildes[1] = fd;
237
238	rf->f_flag = FREAD | flags;
239	rf->f_type = DTYPE_PIPE;
240	rf->f_pipe = rpipe;
241	rf->f_ops = &pipeops;
242	fd_set_exclose(l, fildes[0], (flags & O_CLOEXEC) != 0);
243
244	wf->f_flag = FWRITE | flags;
245	wf->f_type = DTYPE_PIPE;
246	wf->f_pipe = wpipe;
247	wf->f_ops = &pipeops;
248	fd_set_exclose(l, fildes[1], (flags & O_CLOEXEC) != 0);
249
250	rpipe->pipe_peer = wpipe;
251	wpipe->pipe_peer = rpipe;
252
253	fd_affix(p, rf, fildes[0]);
254	fd_affix(p, wf, fildes[1]);
255	return (0);
256free3:
257	fd_abort(p, rf, fildes[0]);
258free2:
259	pipeclose(wpipe);
260	pipeclose(rpipe);
261
262	return (error);
263}
264
265/*
266 * Allocate kva for pipe circular buffer, the space is pageable
267 * This routine will 'realloc' the size of a pipe safely, if it fails
268 * it will retain the old buffer.
269 * If it fails it will return ENOMEM.
270 */
271static int
272pipespace(struct pipe *pipe, int size)
273{
274	void *buffer;
275
276	/*
277	 * Allocate pageable virtual address space.  Physical memory is
278	 * allocated on demand.
279	 */
280	if (size == PIPE_SIZE && pipe->pipe_kmem != 0) {
281		buffer = (void *)pipe->pipe_kmem;
282	} else {
283		buffer = (void *)uvm_km_alloc(kernel_map, round_page(size),
284		    0, UVM_KMF_PAGEABLE);
285		if (buffer == NULL)
286			return (ENOMEM);
287		atomic_add_int(&amountpipekva, size);
288	}
289
290	/* free old resources if we're resizing */
291	pipe_free_kmem(pipe);
292	pipe->pipe_buffer.buffer = buffer;
293	pipe->pipe_buffer.size = size;
294	pipe->pipe_buffer.in = 0;
295	pipe->pipe_buffer.out = 0;
296	pipe->pipe_buffer.cnt = 0;
297	return (0);
298}
299
300/*
301 * Initialize and allocate VM and memory for pipe.
302 */
303static int
304pipe_create(struct pipe **pipep, pool_cache_t cache, struct timespec *nt)
305{
306	struct pipe *pipe;
307	int error;
308
309	pipe = pool_cache_get(cache, PR_WAITOK);
310	KASSERT(pipe != NULL);
311	*pipep = pipe;
312	error = 0;
313	pipe->pipe_atime = pipe->pipe_mtime = pipe->pipe_btime = *nt;
314	pipe->pipe_lock = NULL;
315	if (cache == pipe_rd_cache) {
316		error = pipespace(pipe, PIPE_SIZE);
317	} else {
318		pipe->pipe_buffer.buffer = NULL;
319		pipe->pipe_buffer.size = 0;
320		pipe->pipe_buffer.in = 0;
321		pipe->pipe_buffer.out = 0;
322		pipe->pipe_buffer.cnt = 0;
323	}
324	return error;
325}
326
327/*
328 * Lock a pipe for I/O, blocking other access
329 * Called with pipe spin lock held.
330 */
331static int
332pipelock(struct pipe *pipe, bool catch_p)
333{
334	int error;
335
336	KASSERT(mutex_owned(pipe->pipe_lock));
337
338	while (pipe->pipe_state & PIPE_LOCKFL) {
339		if (catch_p) {
340			error = cv_wait_sig(&pipe->pipe_lkcv, pipe->pipe_lock);
341			if (error != 0) {
342				return error;
343			}
344		} else
345			cv_wait(&pipe->pipe_lkcv, pipe->pipe_lock);
346	}
347
348	pipe->pipe_state |= PIPE_LOCKFL;
349
350	return 0;
351}
352
353/*
354 * unlock a pipe I/O lock
355 */
356static inline void
357pipeunlock(struct pipe *pipe)
358{
359
360	KASSERT(pipe->pipe_state & PIPE_LOCKFL);
361
362	pipe->pipe_state &= ~PIPE_LOCKFL;
363	cv_signal(&pipe->pipe_lkcv);
364}
365
366/*
367 * Select/poll wakeup. This also sends SIGIO to peer connected to
368 * 'sigpipe' side of pipe.
369 */
370static void
371pipeselwakeup(struct pipe *selp, struct pipe *sigp, int code)
372{
373	int band;
374
375	switch (code) {
376	case POLL_IN:
377		band = POLLIN|POLLRDNORM;
378		break;
379	case POLL_OUT:
380		band = POLLOUT|POLLWRNORM;
381		break;
382	case POLL_HUP:
383		band = POLLHUP;
384		break;
385	case POLL_ERR:
386		band = POLLERR;
387		break;
388	default:
389		band = 0;
390#ifdef DIAGNOSTIC
391		printf("bad siginfo code %d in pipe notification.\n", code);
392#endif
393		break;
394	}
395
396	selnotify(&selp->pipe_sel, band, NOTE_SUBMIT);
397
398	if (sigp == NULL || (sigp->pipe_state & PIPE_ASYNC) == 0)
399		return;
400
401	fownsignal(sigp->pipe_pgid, SIGIO, code, band, selp);
402}
403
404static int
405pipe_read(file_t *fp, off_t *offset, struct uio *uio, kauth_cred_t cred,
406    int flags)
407{
408	struct pipe *rpipe = fp->f_pipe;
409	struct pipebuf *bp = &rpipe->pipe_buffer;
410	kmutex_t *lock = rpipe->pipe_lock;
411	int error;
412	size_t nread = 0;
413	size_t size;
414	size_t ocnt;
415	unsigned int wakeup_state = 0;
416
417	/*
418	 * Try to avoid locking the pipe if we have nothing to do.
419	 *
420	 * There are programs which share one pipe amongst multiple processes
421	 * and perform non-blocking reads in parallel, even if the pipe is
422	 * empty.  This in particular is the case with BSD make, which when
423	 * spawned with a high -j number can find itself with over half of the
424	 * calls failing to find anything.
425	 */
426	if ((fp->f_flag & FNONBLOCK) != 0) {
427		if (__predict_false(uio->uio_resid == 0))
428			return (0);
429		if (atomic_load_relaxed(&bp->cnt) == 0 &&
430		    (atomic_load_relaxed(&rpipe->pipe_state) & PIPE_EOF) == 0)
431			return (EAGAIN);
432	}
433
434	mutex_enter(lock);
435	++rpipe->pipe_busy;
436	ocnt = bp->cnt;
437
438again:
439	error = pipelock(rpipe, true);
440	if (error)
441		goto unlocked_error;
442
443	while (uio->uio_resid) {
444		/*
445		 * Normal pipe buffer receive.
446		 */
447		if (bp->cnt > 0) {
448			size = bp->size - bp->out;
449			if (size > bp->cnt)
450				size = bp->cnt;
451			if (size > uio->uio_resid)
452				size = uio->uio_resid;
453
454			mutex_exit(lock);
455			error = uiomove((char *)bp->buffer + bp->out, size, uio);
456			mutex_enter(lock);
457			if (error)
458				break;
459
460			bp->out += size;
461			if (bp->out >= bp->size)
462				bp->out = 0;
463
464			bp->cnt -= size;
465
466			/*
467			 * If there is no more to read in the pipe, reset
468			 * its pointers to the beginning.  This improves
469			 * cache hit stats.
470			 */
471			if (bp->cnt == 0) {
472				bp->in = 0;
473				bp->out = 0;
474			}
475			nread += size;
476			continue;
477		}
478
479		/*
480		 * Break if some data was read.
481		 */
482		if (nread > 0)
483			break;
484
485		/*
486		 * Detect EOF condition.
487		 * Read returns 0 on EOF, no need to set error.
488		 */
489		if (rpipe->pipe_state & PIPE_EOF)
490			break;
491
492		/*
493		 * Don't block on non-blocking I/O.
494		 */
495		if (fp->f_flag & FNONBLOCK) {
496			error = EAGAIN;
497			break;
498		}
499
500		/*
501		 * Unlock the pipe buffer for our remaining processing.
502		 * We will either break out with an error or we will
503		 * sleep and relock to loop.
504		 */
505		pipeunlock(rpipe);
506
507#if 1   /* XXX (dsl) I'm sure these aren't needed here ... */
508		/*
509		 * We want to read more, wake up select/poll.
510		 */
511		pipeselwakeup(rpipe, rpipe->pipe_peer, POLL_OUT);
512
513		/*
514		 * If the "write-side" is blocked, wake it up now.
515		 */
516		cv_broadcast(&rpipe->pipe_wcv);
517#endif
518
519		if (wakeup_state & PIPE_RESTART) {
520			error = ERESTART;
521			goto unlocked_error;
522		}
523
524		/* Now wait until the pipe is filled */
525		error = cv_wait_sig(&rpipe->pipe_rcv, lock);
526		if (error != 0)
527			goto unlocked_error;
528		wakeup_state = rpipe->pipe_state;
529		goto again;
530	}
531
532	if (error == 0)
533		getnanotime(&rpipe->pipe_atime);
534	pipeunlock(rpipe);
535
536unlocked_error:
537	--rpipe->pipe_busy;
538	if (rpipe->pipe_busy == 0) {
539		rpipe->pipe_state &= ~PIPE_RESTART;
540		cv_broadcast(&rpipe->pipe_draincv);
541	}
542	if (bp->cnt < MINPIPESIZE) {
543		cv_broadcast(&rpipe->pipe_wcv);
544	}
545
546	/*
547	 * If anything was read off the buffer, signal to the writer it's
548	 * possible to write more data. Also send signal if we are here for the
549	 * first time after last write.
550	 */
551	if ((bp->size - bp->cnt) >= PIPE_BUF
552	    && (ocnt != bp->cnt || (rpipe->pipe_state & PIPE_SIGNALR))) {
553		pipeselwakeup(rpipe, rpipe->pipe_peer, POLL_OUT);
554		rpipe->pipe_state &= ~PIPE_SIGNALR;
555	}
556
557	mutex_exit(lock);
558	return (error);
559}
560
561static int
562pipe_write(file_t *fp, off_t *offset, struct uio *uio, kauth_cred_t cred,
563    int flags)
564{
565	struct pipe *wpipe, *rpipe;
566	struct pipebuf *bp;
567	kmutex_t *lock;
568	int error;
569	unsigned int wakeup_state = 0;
570
571	/* We want to write to our peer */
572	rpipe = fp->f_pipe;
573	lock = rpipe->pipe_lock;
574	error = 0;
575
576	mutex_enter(lock);
577	wpipe = rpipe->pipe_peer;
578
579	/*
580	 * Detect loss of pipe read side, issue SIGPIPE if lost.
581	 */
582	if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF) != 0) {
583		mutex_exit(lock);
584		return EPIPE;
585	}
586	++wpipe->pipe_busy;
587
588	/* Acquire the long-term pipe lock */
589	if ((error = pipelock(wpipe, true)) != 0) {
590		--wpipe->pipe_busy;
591		if (wpipe->pipe_busy == 0) {
592			wpipe->pipe_state &= ~PIPE_RESTART;
593			cv_broadcast(&wpipe->pipe_draincv);
594		}
595		mutex_exit(lock);
596		return (error);
597	}
598
599	bp = &wpipe->pipe_buffer;
600
601	/*
602	 * If it is advantageous to resize the pipe buffer, do so.
603	 */
604	if ((uio->uio_resid > PIPE_SIZE) &&
605	    (nbigpipe < maxbigpipes) &&
606	    (bp->size <= PIPE_SIZE) && (bp->cnt == 0)) {
607
608		if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
609			atomic_inc_uint(&nbigpipe);
610	}
611
612	while (uio->uio_resid) {
613		size_t space;
614
615		space = bp->size - bp->cnt;
616
617		/* Writes of size <= PIPE_BUF must be atomic. */
618		if ((space < uio->uio_resid) && (uio->uio_resid <= PIPE_BUF))
619			space = 0;
620
621		if (space > 0) {
622			int size;	/* Transfer size */
623			int segsize;	/* first segment to transfer */
624
625			/*
626			 * Transfer size is minimum of uio transfer
627			 * and free space in pipe buffer.
628			 */
629			if (space > uio->uio_resid)
630				size = uio->uio_resid;
631			else
632				size = space;
633			/*
634			 * First segment to transfer is minimum of
635			 * transfer size and contiguous space in
636			 * pipe buffer.  If first segment to transfer
637			 * is less than the transfer size, we've got
638			 * a wraparound in the buffer.
639			 */
640			segsize = bp->size - bp->in;
641			if (segsize > size)
642				segsize = size;
643
644			/* Transfer first segment */
645			mutex_exit(lock);
646			error = uiomove((char *)bp->buffer + bp->in, segsize,
647			    uio);
648
649			if (error == 0 && segsize < size) {
650				/*
651				 * Transfer remaining part now, to
652				 * support atomic writes.  Wraparound
653				 * happened.
654				 */
655				KASSERT(bp->in + segsize == bp->size);
656				error = uiomove(bp->buffer,
657				    size - segsize, uio);
658			}
659			mutex_enter(lock);
660			if (error)
661				break;
662
663			bp->in += size;
664			if (bp->in >= bp->size) {
665				KASSERT(bp->in == size - segsize + bp->size);
666				bp->in = size - segsize;
667			}
668
669			bp->cnt += size;
670			KASSERT(bp->cnt <= bp->size);
671			wakeup_state = 0;
672		} else {
673			/*
674			 * If the "read-side" has been blocked, wake it up now.
675			 */
676			cv_broadcast(&wpipe->pipe_rcv);
677
678			/*
679			 * Don't block on non-blocking I/O.
680			 */
681			if (fp->f_flag & FNONBLOCK) {
682				error = EAGAIN;
683				break;
684			}
685
686			/*
687			 * We have no more space and have something to offer,
688			 * wake up select/poll.
689			 */
690			if (bp->cnt)
691				pipeselwakeup(wpipe, wpipe, POLL_IN);
692
693			if (wakeup_state & PIPE_RESTART) {
694				error = ERESTART;
695				break;
696			}
697
698			/*
699			 * If read side wants to go away, we just issue a signal
700			 * to ourselves.
701			 */
702			if (wpipe->pipe_state & PIPE_EOF) {
703				error = EPIPE;
704				break;
705			}
706
707			pipeunlock(wpipe);
708			error = cv_wait_sig(&wpipe->pipe_wcv, lock);
709			(void)pipelock(wpipe, false);
710			if (error != 0)
711				break;
712			wakeup_state = wpipe->pipe_state;
713		}
714	}
715
716	--wpipe->pipe_busy;
717	if (wpipe->pipe_busy == 0) {
718		wpipe->pipe_state &= ~PIPE_RESTART;
719		cv_broadcast(&wpipe->pipe_draincv);
720	}
721	if (bp->cnt > 0) {
722		cv_broadcast(&wpipe->pipe_rcv);
723	}
724
725	/*
726	 * Don't return EPIPE if I/O was successful
727	 */
728	if (error == EPIPE && bp->cnt == 0 && uio->uio_resid == 0)
729		error = 0;
730
731	if (error == 0)
732		getnanotime(&wpipe->pipe_mtime);
733
734	/*
735	 * We have something to offer, wake up select/poll.
736	 */
737	if (bp->cnt)
738		pipeselwakeup(wpipe, wpipe, POLL_IN);
739
740	/*
741	 * Arrange for next read(2) to do a signal.
742	 */
743	wpipe->pipe_state |= PIPE_SIGNALR;
744
745	pipeunlock(wpipe);
746	mutex_exit(lock);
747	return (error);
748}
749
750/*
751 * We implement a very minimal set of ioctls for compatibility with sockets.
752 */
753int
754pipe_ioctl(file_t *fp, u_long cmd, void *data)
755{
756	struct pipe *pipe = fp->f_pipe;
757	kmutex_t *lock = pipe->pipe_lock;
758
759	switch (cmd) {
760
761	case FIONBIO:
762		return (0);
763
764	case FIOASYNC:
765		mutex_enter(lock);
766		if (*(int *)data) {
767			pipe->pipe_state |= PIPE_ASYNC;
768		} else {
769			pipe->pipe_state &= ~PIPE_ASYNC;
770		}
771		mutex_exit(lock);
772		return (0);
773
774	case FIONREAD:
775		mutex_enter(lock);
776		*(int *)data = pipe->pipe_buffer.cnt;
777		mutex_exit(lock);
778		return (0);
779
780	case FIONWRITE:
781		/* Look at other side */
782		mutex_enter(lock);
783		pipe = pipe->pipe_peer;
784		if (pipe == NULL)
785			*(int *)data = 0;
786		else
787			*(int *)data = pipe->pipe_buffer.cnt;
788		mutex_exit(lock);
789		return (0);
790
791	case FIONSPACE:
792		/* Look at other side */
793		mutex_enter(lock);
794		pipe = pipe->pipe_peer;
795		if (pipe == NULL)
796			*(int *)data = 0;
797		else
798			*(int *)data = pipe->pipe_buffer.size -
799			    pipe->pipe_buffer.cnt;
800		mutex_exit(lock);
801		return (0);
802
803	case TIOCSPGRP:
804	case FIOSETOWN:
805		return fsetown(&pipe->pipe_pgid, cmd, data);
806
807	case TIOCGPGRP:
808	case FIOGETOWN:
809		return fgetown(pipe->pipe_pgid, cmd, data);
810
811	}
812	return (EPASSTHROUGH);
813}
814
815int
816pipe_poll(file_t *fp, int events)
817{
818	struct pipe *rpipe = fp->f_pipe;
819	struct pipe *wpipe;
820	int eof = 0;
821	int revents = 0;
822
823	mutex_enter(rpipe->pipe_lock);
824	wpipe = rpipe->pipe_peer;
825
826	if (events & (POLLIN | POLLRDNORM))
827		if ((rpipe->pipe_buffer.cnt > 0) ||
828		    (rpipe->pipe_state & PIPE_EOF))
829			revents |= events & (POLLIN | POLLRDNORM);
830
831	eof |= (rpipe->pipe_state & PIPE_EOF);
832
833	if (wpipe == NULL)
834		revents |= events & (POLLOUT | POLLWRNORM);
835	else {
836		if (events & (POLLOUT | POLLWRNORM))
837			if ((wpipe->pipe_state & PIPE_EOF) || (
838			     (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF))
839				revents |= events & (POLLOUT | POLLWRNORM);
840
841		eof |= (wpipe->pipe_state & PIPE_EOF);
842	}
843
844	if (wpipe == NULL || eof)
845		revents |= POLLHUP;
846
847	if (revents == 0) {
848		if (events & (POLLIN | POLLRDNORM))
849			selrecord(curlwp, &rpipe->pipe_sel);
850
851		if (events & (POLLOUT | POLLWRNORM))
852			selrecord(curlwp, &wpipe->pipe_sel);
853	}
854	mutex_exit(rpipe->pipe_lock);
855
856	return (revents);
857}
858
859static int
860pipe_stat(file_t *fp, struct stat *ub)
861{
862	struct pipe *pipe = fp->f_pipe;
863
864	mutex_enter(pipe->pipe_lock);
865	memset(ub, 0, sizeof(*ub));
866	ub->st_mode = S_IFIFO | S_IRUSR | S_IWUSR;
867	ub->st_blksize = pipe->pipe_buffer.size;
868	if (ub->st_blksize == 0 && pipe->pipe_peer)
869		ub->st_blksize = pipe->pipe_peer->pipe_buffer.size;
870	ub->st_size = pipe->pipe_buffer.cnt;
871	ub->st_blocks = (ub->st_size) ? 1 : 0;
872	ub->st_atimespec = pipe->pipe_atime;
873	ub->st_mtimespec = pipe->pipe_mtime;
874	ub->st_ctimespec = ub->st_birthtimespec = pipe->pipe_btime;
875	ub->st_uid = kauth_cred_geteuid(fp->f_cred);
876	ub->st_gid = kauth_cred_getegid(fp->f_cred);
877
878	/*
879	 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen.
880	 * XXX (st_dev, st_ino) should be unique.
881	 */
882	mutex_exit(pipe->pipe_lock);
883	return 0;
884}
885
886static int
887pipe_close(file_t *fp)
888{
889	struct pipe *pipe = fp->f_pipe;
890
891	fp->f_pipe = NULL;
892	pipeclose(pipe);
893	return (0);
894}
895
896static void
897pipe_restart(file_t *fp)
898{
899	struct pipe *pipe = fp->f_pipe;
900
901	/*
902	 * Unblock blocked reads/writes in order to allow close() to complete.
903	 * System calls return ERESTART so that the fd is revalidated.
904	 * (Partial writes return the transfer length.)
905	 */
906	mutex_enter(pipe->pipe_lock);
907	pipe->pipe_state |= PIPE_RESTART;
908	/* Wakeup both cvs, maybe we only need one, but maybe there are some
909	 * other paths where wakeup is needed, and it saves deciding which! */
910	cv_broadcast(&pipe->pipe_rcv);
911	cv_broadcast(&pipe->pipe_wcv);
912	mutex_exit(pipe->pipe_lock);
913}
914
915static int
916pipe_fpathconf(struct file *fp, int name, register_t *retval)
917{
918
919	switch (name) {
920	case _PC_PIPE_BUF:
921		*retval = PIPE_BUF;
922		return 0;
923	default:
924		return EINVAL;
925	}
926}
927
928static int
929pipe_posix_fadvise(struct file *fp, off_t offset, off_t len, int advice)
930{
931
932	return ESPIPE;
933}
934
935static void
936pipe_free_kmem(struct pipe *pipe)
937{
938
939	if (pipe->pipe_buffer.buffer != NULL) {
940		if (pipe->pipe_buffer.size > PIPE_SIZE) {
941			atomic_dec_uint(&nbigpipe);
942		}
943		if (pipe->pipe_buffer.buffer != (void *)pipe->pipe_kmem) {
944			uvm_km_free(kernel_map,
945			    (vaddr_t)pipe->pipe_buffer.buffer,
946			    pipe->pipe_buffer.size, UVM_KMF_PAGEABLE);
947			atomic_add_int(&amountpipekva,
948			    -pipe->pipe_buffer.size);
949		}
950		pipe->pipe_buffer.buffer = NULL;
951	}
952}
953
954/*
955 * Shutdown the pipe.
956 */
957static void
958pipeclose(struct pipe *pipe)
959{
960	kmutex_t *lock;
961	struct pipe *ppipe;
962
963	if (pipe == NULL)
964		return;
965
966	KASSERT(cv_is_valid(&pipe->pipe_rcv));
967	KASSERT(cv_is_valid(&pipe->pipe_wcv));
968	KASSERT(cv_is_valid(&pipe->pipe_draincv));
969	KASSERT(cv_is_valid(&pipe->pipe_lkcv));
970
971	lock = pipe->pipe_lock;
972	if (lock == NULL)
973		/* Must have failed during create */
974		goto free_resources;
975
976	mutex_enter(lock);
977	pipeselwakeup(pipe, pipe, POLL_HUP);
978
979	/*
980	 * If the other side is blocked, wake it up saying that
981	 * we want to close it down.
982	 */
983	pipe->pipe_state |= PIPE_EOF;
984	if (pipe->pipe_busy) {
985		while (pipe->pipe_busy) {
986			cv_broadcast(&pipe->pipe_wcv);
987			cv_wait_sig(&pipe->pipe_draincv, lock);
988		}
989	}
990
991	/*
992	 * Disconnect from peer.
993	 */
994	if ((ppipe = pipe->pipe_peer) != NULL) {
995		pipeselwakeup(ppipe, ppipe, POLL_HUP);
996		ppipe->pipe_state |= PIPE_EOF;
997		cv_broadcast(&ppipe->pipe_rcv);
998		ppipe->pipe_peer = NULL;
999	}
1000
1001	/*
1002	 * Any knote objects still left in the list are
1003	 * the one attached by peer.  Since no one will
1004	 * traverse this list, we just clear it.
1005	 *
1006	 * XXX Exposes select/kqueue internals.
1007	 */
1008	SLIST_INIT(&pipe->pipe_sel.sel_klist);
1009
1010	KASSERT((pipe->pipe_state & PIPE_LOCKFL) == 0);
1011	mutex_exit(lock);
1012	mutex_obj_free(lock);
1013
1014	/*
1015	 * Free resources.
1016	 */
1017    free_resources:
1018	pipe->pipe_pgid = 0;
1019	pipe->pipe_state = PIPE_SIGNALR;
1020	pipe->pipe_peer = NULL;
1021	pipe->pipe_lock = NULL;
1022	pipe_free_kmem(pipe);
1023	if (pipe->pipe_kmem != 0) {
1024		pool_cache_put(pipe_rd_cache, pipe);
1025	} else {
1026		pool_cache_put(pipe_wr_cache, pipe);
1027	}
1028}
1029
1030static void
1031filt_pipedetach(struct knote *kn)
1032{
1033	struct pipe *pipe;
1034	kmutex_t *lock;
1035
1036	pipe = ((file_t *)kn->kn_obj)->f_pipe;
1037	lock = pipe->pipe_lock;
1038
1039	mutex_enter(lock);
1040
1041	switch(kn->kn_filter) {
1042	case EVFILT_WRITE:
1043		/* Need the peer structure, not our own. */
1044		pipe = pipe->pipe_peer;
1045
1046		/* If reader end already closed, just return. */
1047		if (pipe == NULL) {
1048			mutex_exit(lock);
1049			return;
1050		}
1051
1052		break;
1053	default:
1054		/* Nothing to do. */
1055		break;
1056	}
1057
1058	KASSERT(kn->kn_hook == pipe);
1059	selremove_knote(&pipe->pipe_sel, kn);
1060	mutex_exit(lock);
1061}
1062
1063static int
1064filt_piperead(struct knote *kn, long hint)
1065{
1066	struct pipe *rpipe = ((file_t *)kn->kn_obj)->f_pipe;
1067	struct pipe *wpipe;
1068	int rv;
1069
1070	if ((hint & NOTE_SUBMIT) == 0) {
1071		mutex_enter(rpipe->pipe_lock);
1072	}
1073	wpipe = rpipe->pipe_peer;
1074	kn->kn_data = rpipe->pipe_buffer.cnt;
1075
1076	if ((rpipe->pipe_state & PIPE_EOF) ||
1077	    (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1078		knote_set_eof(kn, 0);
1079		rv = 1;
1080	} else {
1081		rv = kn->kn_data > 0;
1082	}
1083
1084	if ((hint & NOTE_SUBMIT) == 0) {
1085		mutex_exit(rpipe->pipe_lock);
1086	}
1087	return rv;
1088}
1089
1090static int
1091filt_pipewrite(struct knote *kn, long hint)
1092{
1093	struct pipe *rpipe = ((file_t *)kn->kn_obj)->f_pipe;
1094	struct pipe *wpipe;
1095	int rv;
1096
1097	if ((hint & NOTE_SUBMIT) == 0) {
1098		mutex_enter(rpipe->pipe_lock);
1099	}
1100	wpipe = rpipe->pipe_peer;
1101
1102	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1103		kn->kn_data = 0;
1104		knote_set_eof(kn, 0);
1105		rv = 1;
1106	} else {
1107		kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
1108		rv = kn->kn_data >= PIPE_BUF;
1109	}
1110
1111	if ((hint & NOTE_SUBMIT) == 0) {
1112		mutex_exit(rpipe->pipe_lock);
1113	}
1114	return rv;
1115}
1116
1117static const struct filterops pipe_rfiltops = {
1118	.f_flags = FILTEROP_ISFD | FILTEROP_MPSAFE,
1119	.f_attach = NULL,
1120	.f_detach = filt_pipedetach,
1121	.f_event = filt_piperead,
1122};
1123
1124static const struct filterops pipe_wfiltops = {
1125	.f_flags = FILTEROP_ISFD | FILTEROP_MPSAFE,
1126	.f_attach = NULL,
1127	.f_detach = filt_pipedetach,
1128	.f_event = filt_pipewrite,
1129};
1130
1131static int
1132pipe_kqfilter(file_t *fp, struct knote *kn)
1133{
1134	struct pipe *pipe;
1135	kmutex_t *lock;
1136
1137	pipe = ((file_t *)kn->kn_obj)->f_pipe;
1138	lock = pipe->pipe_lock;
1139
1140	mutex_enter(lock);
1141
1142	switch (kn->kn_filter) {
1143	case EVFILT_READ:
1144		kn->kn_fop = &pipe_rfiltops;
1145		break;
1146	case EVFILT_WRITE:
1147		kn->kn_fop = &pipe_wfiltops;
1148		pipe = pipe->pipe_peer;
1149		if (pipe == NULL) {
1150			/* Other end of pipe has been closed. */
1151			mutex_exit(lock);
1152			return (EBADF);
1153		}
1154		break;
1155	default:
1156		mutex_exit(lock);
1157		return (EINVAL);
1158	}
1159
1160	kn->kn_hook = pipe;
1161	selrecord_knote(&pipe->pipe_sel, kn);
1162	mutex_exit(lock);
1163
1164	return (0);
1165}
1166
1167/*
1168 * Handle pipe sysctls.
1169 */
1170SYSCTL_SETUP(sysctl_kern_pipe_setup, "sysctl kern.pipe subtree setup")
1171{
1172
1173	sysctl_createv(clog, 0, NULL, NULL,
1174		       CTLFLAG_PERMANENT,
1175		       CTLTYPE_NODE, "pipe",
1176		       SYSCTL_DESCR("Pipe settings"),
1177		       NULL, 0, NULL, 0,
1178		       CTL_KERN, KERN_PIPE, CTL_EOL);
1179
1180	sysctl_createv(clog, 0, NULL, NULL,
1181		       CTLFLAG_PERMANENT|CTLFLAG_READWRITE,
1182		       CTLTYPE_INT, "maxbigpipes",
1183		       SYSCTL_DESCR("Maximum number of \"big\" pipes"),
1184		       NULL, 0, &maxbigpipes, 0,
1185		       CTL_KERN, KERN_PIPE, KERN_PIPE_MAXBIGPIPES, CTL_EOL);
1186	sysctl_createv(clog, 0, NULL, NULL,
1187		       CTLFLAG_PERMANENT,
1188		       CTLTYPE_INT, "nbigpipes",
1189		       SYSCTL_DESCR("Number of \"big\" pipes"),
1190		       NULL, 0, &nbigpipe, 0,
1191		       CTL_KERN, KERN_PIPE, KERN_PIPE_NBIGPIPES, CTL_EOL);
1192	sysctl_createv(clog, 0, NULL, NULL,
1193		       CTLFLAG_PERMANENT,
1194		       CTLTYPE_INT, "kvasize",
1195		       SYSCTL_DESCR("Amount of kernel memory consumed by pipe "
1196				    "buffers"),
1197		       NULL, 0, &amountpipekva, 0,
1198		       CTL_KERN, KERN_PIPE, KERN_PIPE_KVASIZE, CTL_EOL);
1199}
1200