sys_pipe.c revision 91372
1/*
2 * Copyright (c) 1996 John S. Dyson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice immediately at the beginning of the file, without modification,
10 *    this list of conditions, and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 * 3. Absolutely no warranty of function or purpose is made by the author
15 *    John S. Dyson.
16 * 4. Modifications may be freely made to this file if the above conditions
17 *    are met.
18 *
19 * $FreeBSD: head/sys/kern/sys_pipe.c 91372 2002-02-27 11:27:48Z alfred $
20 */
21
22/*
23 * This file contains a high-performance replacement for the socket-based
24 * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
25 * all features of sockets, but does do everything that pipes normally
26 * do.
27 */
28
29/*
30 * This code has two modes of operation, a small write mode and a large
31 * write mode.  The small write mode acts like conventional pipes with
32 * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
33 * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
34 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
35 * the receiving process can copy it directly from the pages in the sending
36 * process.
37 *
38 * If the sending process receives a signal, it is possible that it will
39 * go away, and certainly its address space can change, because control
40 * is returned back to the user-mode side.  In that case, the pipe code
41 * arranges to copy the buffer supplied by the user process, to a pageable
42 * kernel buffer, and the receiving process will grab the data from the
43 * pageable kernel buffer.  Since signals don't happen all that often,
44 * the copy operation is normally eliminated.
45 *
46 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
47 * happen for small transfers so that the system will not spend all of
48 * its time context switching.  PIPE_SIZE is constrained by the
49 * amount of kernel virtual memory.
50 */
51
52#include <sys/param.h>
53#include <sys/systm.h>
54#include <sys/fcntl.h>
55#include <sys/file.h>
56#include <sys/filedesc.h>
57#include <sys/filio.h>
58#include <sys/kernel.h>
59#include <sys/lock.h>
60#include <sys/mutex.h>
61#include <sys/ttycom.h>
62#include <sys/stat.h>
63#include <sys/poll.h>
64#include <sys/selinfo.h>
65#include <sys/signalvar.h>
66#include <sys/sysproto.h>
67#include <sys/pipe.h>
68#include <sys/proc.h>
69#include <sys/vnode.h>
70#include <sys/uio.h>
71#include <sys/event.h>
72
73#include <vm/vm.h>
74#include <vm/vm_param.h>
75#include <vm/vm_object.h>
76#include <vm/vm_kern.h>
77#include <vm/vm_extern.h>
78#include <vm/pmap.h>
79#include <vm/vm_map.h>
80#include <vm/vm_page.h>
81#include <vm/vm_zone.h>
82
83/*
84 * Use this define if you want to disable *fancy* VM things.  Expect an
85 * approx 30% decrease in transfer rate.  This could be useful for
86 * NetBSD or OpenBSD.
87 */
88/* #define PIPE_NODIRECT */
89
90/*
91 * interfaces to the outside world
92 */
93static int pipe_read __P((struct file *fp, struct uio *uio,
94		struct ucred *cred, int flags, struct thread *td));
95static int pipe_write __P((struct file *fp, struct uio *uio,
96		struct ucred *cred, int flags, struct thread *td));
97static int pipe_close __P((struct file *fp, struct thread *td));
98static int pipe_poll __P((struct file *fp, int events, struct ucred *cred,
99		struct thread *td));
100static int pipe_kqfilter __P((struct file *fp, struct knote *kn));
101static int pipe_stat __P((struct file *fp, struct stat *sb, struct thread *td));
102static int pipe_ioctl __P((struct file *fp, u_long cmd, caddr_t data, struct thread *td));
103
104static struct fileops pipeops = {
105	pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter,
106	pipe_stat, pipe_close
107};
108
109static void	filt_pipedetach(struct knote *kn);
110static int	filt_piperead(struct knote *kn, long hint);
111static int	filt_pipewrite(struct knote *kn, long hint);
112
113static struct filterops pipe_rfiltops =
114	{ 1, NULL, filt_pipedetach, filt_piperead };
115static struct filterops pipe_wfiltops =
116	{ 1, NULL, filt_pipedetach, filt_pipewrite };
117
118#define PIPE_GET_GIANT(pipe)							\
119	do {								\
120		PIPE_UNLOCK(wpipe);					\
121		mtx_lock(&Giant);					\
122	} while (0)
123
124#define PIPE_DROP_GIANT(pipe)						\
125	do {								\
126		mtx_unlock(&Giant);					\
127		PIPE_LOCK(wpipe);					\
128	} while (0)
129
130/*
131 * Default pipe buffer size(s), this can be kind-of large now because pipe
132 * space is pageable.  The pipe code will try to maintain locality of
133 * reference for performance reasons, so small amounts of outstanding I/O
134 * will not wipe the cache.
135 */
136#define MINPIPESIZE (PIPE_SIZE/3)
137#define MAXPIPESIZE (2*PIPE_SIZE/3)
138
139/*
140 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
141 * is there so that on large systems, we don't exhaust it.
142 */
143#define MAXPIPEKVA (8*1024*1024)
144
145/*
146 * Limit for direct transfers, we cannot, of course limit
147 * the amount of kva for pipes in general though.
148 */
149#define LIMITPIPEKVA (16*1024*1024)
150
151/*
152 * Limit the number of "big" pipes
153 */
154#define LIMITBIGPIPES	32
155static int nbigpipe;
156
157static int amountpipekva;
158
159static void pipeinit __P((void *dummy __unused));
160static void pipeclose __P((struct pipe *cpipe));
161static void pipe_free_kmem __P((struct pipe *cpipe));
162static int pipe_create __P((struct pipe **cpipep));
163static __inline int pipelock __P((struct pipe *cpipe, int catch));
164static __inline void pipeunlock __P((struct pipe *cpipe));
165static __inline void pipeselwakeup __P((struct pipe *cpipe));
166#ifndef PIPE_NODIRECT
167static int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio));
168static void pipe_destroy_write_buffer __P((struct pipe *wpipe));
169static int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio));
170static void pipe_clone_write_buffer __P((struct pipe *wpipe));
171#endif
172static int pipespace __P((struct pipe *cpipe, int size));
173
174static vm_zone_t pipe_zone;
175
176SYSINIT(vfs, SI_SUB_VFS, SI_ORDER_ANY, pipeinit, NULL);
177
178static void
179pipeinit(void *dummy __unused)
180{
181
182	pipe_zone = zinit("PIPE", sizeof(struct pipe), 0, 0, 4);
183}
184
185/*
186 * The pipe system call for the DTYPE_PIPE type of pipes
187 */
188
189/* ARGSUSED */
190int
191pipe(td, uap)
192	struct thread *td;
193	struct pipe_args /* {
194		int	dummy;
195	} */ *uap;
196{
197	struct filedesc *fdp = td->td_proc->p_fd;
198	struct file *rf, *wf;
199	struct pipe *rpipe, *wpipe;
200	int fd, error;
201
202	KASSERT(pipe_zone != NULL, ("pipe_zone not initialized"));
203
204	rpipe = wpipe = NULL;
205	if (pipe_create(&rpipe) || pipe_create(&wpipe)) {
206		pipeclose(rpipe);
207		pipeclose(wpipe);
208		return (ENFILE);
209	}
210
211	rpipe->pipe_state |= PIPE_DIRECTOK;
212	wpipe->pipe_state |= PIPE_DIRECTOK;
213
214	error = falloc(td, &rf, &fd);
215	if (error) {
216		pipeclose(rpipe);
217		pipeclose(wpipe);
218		return (error);
219	}
220	fhold(rf);
221	td->td_retval[0] = fd;
222
223	/*
224	 * Warning: once we've gotten past allocation of the fd for the
225	 * read-side, we can only drop the read side via fdrop() in order
226	 * to avoid races against processes which manage to dup() the read
227	 * side while we are blocked trying to allocate the write side.
228	 */
229	FILE_LOCK(rf);
230	rf->f_flag = FREAD | FWRITE;
231	rf->f_type = DTYPE_PIPE;
232	rf->f_data = (caddr_t)rpipe;
233	rf->f_ops = &pipeops;
234	FILE_UNLOCK(rf);
235	error = falloc(td, &wf, &fd);
236	if (error) {
237		FILEDESC_LOCK(fdp);
238		if (fdp->fd_ofiles[td->td_retval[0]] == rf) {
239			fdp->fd_ofiles[td->td_retval[0]] = NULL;
240			FILEDESC_UNLOCK(fdp);
241			fdrop(rf, td);
242		} else
243			FILEDESC_UNLOCK(fdp);
244		fdrop(rf, td);
245		/* rpipe has been closed by fdrop(). */
246		pipeclose(wpipe);
247		return (error);
248	}
249	FILE_LOCK(wf);
250	wf->f_flag = FREAD | FWRITE;
251	wf->f_type = DTYPE_PIPE;
252	wf->f_data = (caddr_t)wpipe;
253	wf->f_ops = &pipeops;
254	FILE_UNLOCK(wf);
255	td->td_retval[1] = fd;
256	rpipe->pipe_peer = wpipe;
257	wpipe->pipe_peer = rpipe;
258	rpipe->pipe_mtxp = wpipe->pipe_mtxp = mtx_pool_alloc();
259	fdrop(rf, td);
260
261	return (0);
262}
263
264/*
265 * Allocate kva for pipe circular buffer, the space is pageable
266 * This routine will 'realloc' the size of a pipe safely, if it fails
267 * it will retain the old buffer.
268 * If it fails it will return ENOMEM.
269 */
270static int
271pipespace(cpipe, size)
272	struct pipe *cpipe;
273	int size;
274{
275	struct vm_object *object;
276	caddr_t buffer;
277	int npages, error;
278
279	GIANT_REQUIRED;
280
281	npages = round_page(size)/PAGE_SIZE;
282	/*
283	 * Create an object, I don't like the idea of paging to/from
284	 * kernel_object.
285	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
286	 */
287	object = vm_object_allocate(OBJT_DEFAULT, npages);
288	buffer = (caddr_t) vm_map_min(kernel_map);
289
290	/*
291	 * Insert the object into the kernel map, and allocate kva for it.
292	 * The map entry is, by default, pageable.
293	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
294	 */
295	error = vm_map_find(kernel_map, object, 0,
296		(vm_offset_t *) &buffer, size, 1,
297		VM_PROT_ALL, VM_PROT_ALL, 0);
298
299	if (error != KERN_SUCCESS) {
300		vm_object_deallocate(object);
301		return (ENOMEM);
302	}
303
304	/* free old resources if we're resizing */
305	pipe_free_kmem(cpipe);
306	cpipe->pipe_buffer.object = object;
307	cpipe->pipe_buffer.buffer = buffer;
308	cpipe->pipe_buffer.size = size;
309	cpipe->pipe_buffer.in = 0;
310	cpipe->pipe_buffer.out = 0;
311	cpipe->pipe_buffer.cnt = 0;
312	amountpipekva += cpipe->pipe_buffer.size;
313	return (0);
314}
315
316/*
317 * initialize and allocate VM and memory for pipe
318 */
319static int
320pipe_create(cpipep)
321	struct pipe **cpipep;
322{
323	struct pipe *cpipe;
324	int error;
325
326	*cpipep = zalloc(pipe_zone);
327	if (*cpipep == NULL)
328		return (ENOMEM);
329
330	cpipe = *cpipep;
331
332	/* so pipespace()->pipe_free_kmem() doesn't follow junk pointer */
333	cpipe->pipe_buffer.object = NULL;
334#ifndef PIPE_NODIRECT
335	cpipe->pipe_map.kva = NULL;
336#endif
337	/*
338	 * protect so pipeclose() doesn't follow a junk pointer
339	 * if pipespace() fails.
340	 */
341	bzero(&cpipe->pipe_sel, sizeof(cpipe->pipe_sel));
342	cpipe->pipe_state = 0;
343	cpipe->pipe_peer = NULL;
344	cpipe->pipe_busy = 0;
345
346#ifndef PIPE_NODIRECT
347	/*
348	 * pipe data structure initializations to support direct pipe I/O
349	 */
350	cpipe->pipe_map.cnt = 0;
351	cpipe->pipe_map.kva = 0;
352	cpipe->pipe_map.pos = 0;
353	cpipe->pipe_map.npages = 0;
354	/* cpipe->pipe_map.ms[] = invalid */
355#endif
356
357	error = pipespace(cpipe, PIPE_SIZE);
358	if (error)
359		return (error);
360
361	vfs_timestamp(&cpipe->pipe_ctime);
362	cpipe->pipe_atime = cpipe->pipe_ctime;
363	cpipe->pipe_mtime = cpipe->pipe_ctime;
364
365	return (0);
366}
367
368
369/*
370 * lock a pipe for I/O, blocking other access
371 */
372static __inline int
373pipelock(cpipe, catch)
374	struct pipe *cpipe;
375	int catch;
376{
377	int error;
378
379	PIPE_LOCK_ASSERT(cpipe, MA_OWNED);
380	while (cpipe->pipe_state & PIPE_LOCKFL) {
381		cpipe->pipe_state |= PIPE_LWANT;
382		error = msleep(cpipe, PIPE_MTX(cpipe),
383		    catch ? (PRIBIO | PCATCH) : PRIBIO,
384		    "pipelk", 0);
385		if (error != 0)
386			return (error);
387	}
388	cpipe->pipe_state |= PIPE_LOCKFL;
389	return (0);
390}
391
392/*
393 * unlock a pipe I/O lock
394 */
395static __inline void
396pipeunlock(cpipe)
397	struct pipe *cpipe;
398{
399
400	PIPE_LOCK_ASSERT(cpipe, MA_OWNED);
401	cpipe->pipe_state &= ~PIPE_LOCKFL;
402	if (cpipe->pipe_state & PIPE_LWANT) {
403		cpipe->pipe_state &= ~PIPE_LWANT;
404		wakeup(cpipe);
405	}
406}
407
408static __inline void
409pipeselwakeup(cpipe)
410	struct pipe *cpipe;
411{
412
413	if (cpipe->pipe_state & PIPE_SEL) {
414		cpipe->pipe_state &= ~PIPE_SEL;
415		selwakeup(&cpipe->pipe_sel);
416	}
417	if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio)
418		pgsigio(cpipe->pipe_sigio, SIGIO, 0);
419	KNOTE(&cpipe->pipe_sel.si_note, 0);
420}
421
422/* ARGSUSED */
423static int
424pipe_read(fp, uio, cred, flags, td)
425	struct file *fp;
426	struct uio *uio;
427	struct ucred *cred;
428	struct thread *td;
429	int flags;
430{
431	struct pipe *rpipe = (struct pipe *) fp->f_data;
432	int error;
433	int nread = 0;
434	u_int size;
435
436	PIPE_LOCK(rpipe);
437	++rpipe->pipe_busy;
438	error = pipelock(rpipe, 1);
439	if (error)
440		goto unlocked_error;
441
442	while (uio->uio_resid) {
443		/*
444		 * normal pipe buffer receive
445		 */
446		if (rpipe->pipe_buffer.cnt > 0) {
447			size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
448			if (size > rpipe->pipe_buffer.cnt)
449				size = rpipe->pipe_buffer.cnt;
450			if (size > (u_int) uio->uio_resid)
451				size = (u_int) uio->uio_resid;
452
453			PIPE_UNLOCK(rpipe);
454			error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
455					size, uio);
456			PIPE_LOCK(rpipe);
457			if (error)
458				break;
459
460			rpipe->pipe_buffer.out += size;
461			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
462				rpipe->pipe_buffer.out = 0;
463
464			rpipe->pipe_buffer.cnt -= size;
465
466			/*
467			 * If there is no more to read in the pipe, reset
468			 * its pointers to the beginning.  This improves
469			 * cache hit stats.
470			 */
471			if (rpipe->pipe_buffer.cnt == 0) {
472				rpipe->pipe_buffer.in = 0;
473				rpipe->pipe_buffer.out = 0;
474			}
475			nread += size;
476#ifndef PIPE_NODIRECT
477		/*
478		 * Direct copy, bypassing a kernel buffer.
479		 */
480		} else if ((size = rpipe->pipe_map.cnt) &&
481			   (rpipe->pipe_state & PIPE_DIRECTW)) {
482			caddr_t	va;
483			if (size > (u_int) uio->uio_resid)
484				size = (u_int) uio->uio_resid;
485
486			va = (caddr_t) rpipe->pipe_map.kva +
487			    rpipe->pipe_map.pos;
488			PIPE_UNLOCK(rpipe);
489			error = uiomove(va, size, uio);
490			PIPE_LOCK(rpipe);
491			if (error)
492				break;
493			nread += size;
494			rpipe->pipe_map.pos += size;
495			rpipe->pipe_map.cnt -= size;
496			if (rpipe->pipe_map.cnt == 0) {
497				rpipe->pipe_state &= ~PIPE_DIRECTW;
498				wakeup(rpipe);
499			}
500#endif
501		} else {
502			/*
503			 * detect EOF condition
504			 * read returns 0 on EOF, no need to set error
505			 */
506			if (rpipe->pipe_state & PIPE_EOF)
507				break;
508
509			/*
510			 * If the "write-side" has been blocked, wake it up now.
511			 */
512			if (rpipe->pipe_state & PIPE_WANTW) {
513				rpipe->pipe_state &= ~PIPE_WANTW;
514				wakeup(rpipe);
515			}
516
517			/*
518			 * Break if some data was read.
519			 */
520			if (nread > 0)
521				break;
522
523			/*
524			 * Unlock the pipe buffer for our remaining processing.  We
525			 * will either break out with an error or we will sleep and
526			 * relock to loop.
527			 */
528			pipeunlock(rpipe);
529
530			/*
531			 * Handle non-blocking mode operation or
532			 * wait for more data.
533			 */
534			if (fp->f_flag & FNONBLOCK) {
535				error = EAGAIN;
536			} else {
537				rpipe->pipe_state |= PIPE_WANTR;
538				if ((error = msleep(rpipe, PIPE_MTX(rpipe),
539				    PRIBIO | PCATCH,
540				    "piperd", 0)) == 0)
541					error = pipelock(rpipe, 1);
542			}
543			if (error)
544				goto unlocked_error;
545		}
546	}
547	pipeunlock(rpipe);
548
549	/* XXX: should probably do this before getting any locks. */
550	if (error == 0)
551		vfs_timestamp(&rpipe->pipe_atime);
552unlocked_error:
553	--rpipe->pipe_busy;
554
555	/*
556	 * PIPE_WANT processing only makes sense if pipe_busy is 0.
557	 */
558	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
559		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
560		wakeup(rpipe);
561	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
562		/*
563		 * Handle write blocking hysteresis.
564		 */
565		if (rpipe->pipe_state & PIPE_WANTW) {
566			rpipe->pipe_state &= ~PIPE_WANTW;
567			wakeup(rpipe);
568		}
569	}
570
571	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
572		pipeselwakeup(rpipe);
573
574	PIPE_UNLOCK(rpipe);
575	return (error);
576}
577
578#ifndef PIPE_NODIRECT
579/*
580 * Map the sending processes' buffer into kernel space and wire it.
581 * This is similar to a physical write operation.
582 */
583static int
584pipe_build_write_buffer(wpipe, uio)
585	struct pipe *wpipe;
586	struct uio *uio;
587{
588	u_int size;
589	int i;
590	vm_offset_t addr, endaddr, paddr;
591
592	GIANT_REQUIRED;
593
594	size = (u_int) uio->uio_iov->iov_len;
595	if (size > wpipe->pipe_buffer.size)
596		size = wpipe->pipe_buffer.size;
597
598	endaddr = round_page((vm_offset_t)uio->uio_iov->iov_base + size);
599	addr = trunc_page((vm_offset_t)uio->uio_iov->iov_base);
600	for (i = 0; addr < endaddr; addr += PAGE_SIZE, i++) {
601		vm_page_t m;
602
603		if (vm_fault_quick((caddr_t)addr, VM_PROT_READ) < 0 ||
604		    (paddr = pmap_kextract(addr)) == 0) {
605			int j;
606
607			for (j = 0; j < i; j++)
608				vm_page_unwire(wpipe->pipe_map.ms[j], 1);
609			return (EFAULT);
610		}
611
612		m = PHYS_TO_VM_PAGE(paddr);
613		vm_page_wire(m);
614		wpipe->pipe_map.ms[i] = m;
615	}
616
617/*
618 * set up the control block
619 */
620	wpipe->pipe_map.npages = i;
621	wpipe->pipe_map.pos =
622	    ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
623	wpipe->pipe_map.cnt = size;
624
625/*
626 * and map the buffer
627 */
628	if (wpipe->pipe_map.kva == 0) {
629		/*
630		 * We need to allocate space for an extra page because the
631		 * address range might (will) span pages at times.
632		 */
633		wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
634			wpipe->pipe_buffer.size + PAGE_SIZE);
635		amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE;
636	}
637	pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
638		wpipe->pipe_map.npages);
639
640/*
641 * and update the uio data
642 */
643
644	uio->uio_iov->iov_len -= size;
645	uio->uio_iov->iov_base += size;
646	if (uio->uio_iov->iov_len == 0)
647		uio->uio_iov++;
648	uio->uio_resid -= size;
649	uio->uio_offset += size;
650	return (0);
651}
652
653/*
654 * unmap and unwire the process buffer
655 */
656static void
657pipe_destroy_write_buffer(wpipe)
658	struct pipe *wpipe;
659{
660	int i;
661
662	GIANT_REQUIRED;
663
664	if (wpipe->pipe_map.kva) {
665		pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
666
667		if (amountpipekva > MAXPIPEKVA) {
668			vm_offset_t kva = wpipe->pipe_map.kva;
669			wpipe->pipe_map.kva = 0;
670			kmem_free(kernel_map, kva,
671				wpipe->pipe_buffer.size + PAGE_SIZE);
672			amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
673		}
674	}
675	for (i = 0; i < wpipe->pipe_map.npages; i++)
676		vm_page_unwire(wpipe->pipe_map.ms[i], 1);
677}
678
679/*
680 * In the case of a signal, the writing process might go away.  This
681 * code copies the data into the circular buffer so that the source
682 * pages can be freed without loss of data.
683 */
684static void
685pipe_clone_write_buffer(wpipe)
686	struct pipe *wpipe;
687{
688	int size;
689	int pos;
690
691	PIPE_LOCK_ASSERT(wpipe, MA_OWNED);
692	size = wpipe->pipe_map.cnt;
693	pos = wpipe->pipe_map.pos;
694	bcopy((caddr_t) wpipe->pipe_map.kva + pos,
695	    (caddr_t) wpipe->pipe_buffer.buffer, size);
696
697	wpipe->pipe_buffer.in = size;
698	wpipe->pipe_buffer.out = 0;
699	wpipe->pipe_buffer.cnt = size;
700	wpipe->pipe_state &= ~PIPE_DIRECTW;
701
702	pipe_destroy_write_buffer(wpipe);
703}
704
705/*
706 * This implements the pipe buffer write mechanism.  Note that only
707 * a direct write OR a normal pipe write can be pending at any given time.
708 * If there are any characters in the pipe buffer, the direct write will
709 * be deferred until the receiving process grabs all of the bytes from
710 * the pipe buffer.  Then the direct mapping write is set-up.
711 */
712static int
713pipe_direct_write(wpipe, uio)
714	struct pipe *wpipe;
715	struct uio *uio;
716{
717	int error;
718
719retry:
720	PIPE_LOCK_ASSERT(wpipe, MA_OWNED);
721	while (wpipe->pipe_state & PIPE_DIRECTW) {
722		if (wpipe->pipe_state & PIPE_WANTR) {
723			wpipe->pipe_state &= ~PIPE_WANTR;
724			wakeup(wpipe);
725		}
726		wpipe->pipe_state |= PIPE_WANTW;
727		error = msleep(wpipe, PIPE_MTX(wpipe),
728		    PRIBIO | PCATCH, "pipdww", 0);
729		if (error)
730			goto error1;
731		if (wpipe->pipe_state & PIPE_EOF) {
732			error = EPIPE;
733			goto error1;
734		}
735	}
736	wpipe->pipe_map.cnt = 0;	/* transfer not ready yet */
737	if (wpipe->pipe_buffer.cnt > 0) {
738		if (wpipe->pipe_state & PIPE_WANTR) {
739			wpipe->pipe_state &= ~PIPE_WANTR;
740			wakeup(wpipe);
741		}
742
743		wpipe->pipe_state |= PIPE_WANTW;
744		error = msleep(wpipe, PIPE_MTX(wpipe),
745		    PRIBIO | PCATCH, "pipdwc", 0);
746		if (error)
747			goto error1;
748		if (wpipe->pipe_state & PIPE_EOF) {
749			error = EPIPE;
750			goto error1;
751		}
752		goto retry;
753	}
754
755	wpipe->pipe_state |= PIPE_DIRECTW;
756
757	PIPE_GET_GIANT(wpipe);
758	error = pipe_build_write_buffer(wpipe, uio);
759	PIPE_DROP_GIANT(wpipe);
760	if (error) {
761		wpipe->pipe_state &= ~PIPE_DIRECTW;
762		goto error1;
763	}
764
765	error = 0;
766	while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
767		if (wpipe->pipe_state & PIPE_EOF) {
768			pipelock(wpipe, 0);
769			PIPE_GET_GIANT(wpipe);
770			pipe_destroy_write_buffer(wpipe);
771			PIPE_DROP_GIANT(wpipe);
772			pipeunlock(wpipe);
773			pipeselwakeup(wpipe);
774			error = EPIPE;
775			goto error1;
776		}
777		if (wpipe->pipe_state & PIPE_WANTR) {
778			wpipe->pipe_state &= ~PIPE_WANTR;
779			wakeup(wpipe);
780		}
781		pipeselwakeup(wpipe);
782		error = msleep(wpipe, PIPE_MTX(wpipe), PRIBIO | PCATCH,
783		    "pipdwt", 0);
784	}
785
786	pipelock(wpipe,0);
787	if (wpipe->pipe_state & PIPE_DIRECTW) {
788		/*
789		 * this bit of trickery substitutes a kernel buffer for
790		 * the process that might be going away.
791		 */
792		pipe_clone_write_buffer(wpipe);
793	} else {
794		pipe_destroy_write_buffer(wpipe);
795	}
796	pipeunlock(wpipe);
797	return (error);
798
799error1:
800	wakeup(wpipe);
801	return (error);
802}
803#endif
804
805static int
806pipe_write(fp, uio, cred, flags, td)
807	struct file *fp;
808	struct uio *uio;
809	struct ucred *cred;
810	struct thread *td;
811	int flags;
812{
813	int error = 0;
814	int orig_resid;
815	struct pipe *wpipe, *rpipe;
816
817	rpipe = (struct pipe *) fp->f_data;
818	wpipe = rpipe->pipe_peer;
819
820	PIPE_LOCK(wpipe);
821	/*
822	 * detect loss of pipe read side, issue SIGPIPE if lost.
823	 */
824	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
825		PIPE_UNLOCK(wpipe);
826		return (EPIPE);
827	}
828	++wpipe->pipe_busy;
829
830	/*
831	 * If it is advantageous to resize the pipe buffer, do
832	 * so.
833	 */
834	if ((uio->uio_resid > PIPE_SIZE) &&
835		(nbigpipe < LIMITBIGPIPES) &&
836		(wpipe->pipe_state & PIPE_DIRECTW) == 0 &&
837		(wpipe->pipe_buffer.size <= PIPE_SIZE) &&
838		(wpipe->pipe_buffer.cnt == 0)) {
839
840		if ((error = pipelock(wpipe,1)) == 0) {
841			PIPE_GET_GIANT(wpipe);
842			if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
843				nbigpipe++;
844			PIPE_DROP_GIANT(wpipe);
845			pipeunlock(wpipe);
846		}
847	}
848
849	/*
850	 * If an early error occured unbusy and return, waking up any pending
851	 * readers.
852	 */
853	if (error) {
854		--wpipe->pipe_busy;
855		if ((wpipe->pipe_busy == 0) &&
856		    (wpipe->pipe_state & PIPE_WANT)) {
857			wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
858			wakeup(wpipe);
859		}
860		PIPE_UNLOCK(wpipe);
861		return(error);
862	}
863
864	KASSERT(wpipe->pipe_buffer.buffer != NULL, ("pipe buffer gone"));
865
866	orig_resid = uio->uio_resid;
867
868	while (uio->uio_resid) {
869		int space;
870
871#ifndef PIPE_NODIRECT
872		/*
873		 * If the transfer is large, we can gain performance if
874		 * we do process-to-process copies directly.
875		 * If the write is non-blocking, we don't use the
876		 * direct write mechanism.
877		 *
878		 * The direct write mechanism will detect the reader going
879		 * away on us.
880		 */
881		if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) &&
882		    (fp->f_flag & FNONBLOCK) == 0 &&
883			(wpipe->pipe_map.kva || (amountpipekva < LIMITPIPEKVA)) &&
884			(uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
885			error = pipe_direct_write( wpipe, uio);
886			if (error)
887				break;
888			continue;
889		}
890#endif
891
892		/*
893		 * Pipe buffered writes cannot be coincidental with
894		 * direct writes.  We wait until the currently executing
895		 * direct write is completed before we start filling the
896		 * pipe buffer.  We break out if a signal occurs or the
897		 * reader goes away.
898		 */
899	retrywrite:
900		while (wpipe->pipe_state & PIPE_DIRECTW) {
901			if (wpipe->pipe_state & PIPE_WANTR) {
902				wpipe->pipe_state &= ~PIPE_WANTR;
903				wakeup(wpipe);
904			}
905			error = msleep(wpipe, PIPE_MTX(wpipe), PRIBIO | PCATCH,
906			    "pipbww", 0);
907			if (wpipe->pipe_state & PIPE_EOF)
908				break;
909			if (error)
910				break;
911		}
912		if (wpipe->pipe_state & PIPE_EOF) {
913			error = EPIPE;
914			break;
915		}
916
917		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
918
919		/* Writes of size <= PIPE_BUF must be atomic. */
920		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
921			space = 0;
922
923		if (space > 0 && (wpipe->pipe_buffer.cnt < PIPE_SIZE)) {
924			if ((error = pipelock(wpipe,1)) == 0) {
925				int size;	/* Transfer size */
926				int segsize;	/* first segment to transfer */
927
928				/*
929				 * It is possible for a direct write to
930				 * slip in on us... handle it here...
931				 */
932				if (wpipe->pipe_state & PIPE_DIRECTW) {
933					pipeunlock(wpipe);
934					goto retrywrite;
935				}
936				/*
937				 * If a process blocked in uiomove, our
938				 * value for space might be bad.
939				 *
940				 * XXX will we be ok if the reader has gone
941				 * away here?
942				 */
943				if (space > wpipe->pipe_buffer.size -
944				    wpipe->pipe_buffer.cnt) {
945					pipeunlock(wpipe);
946					goto retrywrite;
947				}
948
949				/*
950				 * Transfer size is minimum of uio transfer
951				 * and free space in pipe buffer.
952				 */
953				if (space > uio->uio_resid)
954					size = uio->uio_resid;
955				else
956					size = space;
957				/*
958				 * First segment to transfer is minimum of
959				 * transfer size and contiguous space in
960				 * pipe buffer.  If first segment to transfer
961				 * is less than the transfer size, we've got
962				 * a wraparound in the buffer.
963				 */
964				segsize = wpipe->pipe_buffer.size -
965					wpipe->pipe_buffer.in;
966				if (segsize > size)
967					segsize = size;
968
969				/* Transfer first segment */
970
971				PIPE_UNLOCK(wpipe);
972				error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
973						segsize, uio);
974				PIPE_LOCK(wpipe);
975
976				if (error == 0 && segsize < size) {
977					/*
978					 * Transfer remaining part now, to
979					 * support atomic writes.  Wraparound
980					 * happened.
981					 */
982					if (wpipe->pipe_buffer.in + segsize !=
983					    wpipe->pipe_buffer.size)
984						panic("Expected pipe buffer wraparound disappeared");
985
986					PIPE_UNLOCK(wpipe);
987					error = uiomove(&wpipe->pipe_buffer.buffer[0],
988							size - segsize, uio);
989					PIPE_LOCK(wpipe);
990				}
991				if (error == 0) {
992					wpipe->pipe_buffer.in += size;
993					if (wpipe->pipe_buffer.in >=
994					    wpipe->pipe_buffer.size) {
995						if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size)
996							panic("Expected wraparound bad");
997						wpipe->pipe_buffer.in = size - segsize;
998					}
999
1000					wpipe->pipe_buffer.cnt += size;
1001					if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size)
1002						panic("Pipe buffer overflow");
1003
1004				}
1005				pipeunlock(wpipe);
1006			}
1007			if (error)
1008				break;
1009
1010		} else {
1011			/*
1012			 * If the "read-side" has been blocked, wake it up now.
1013			 */
1014			if (wpipe->pipe_state & PIPE_WANTR) {
1015				wpipe->pipe_state &= ~PIPE_WANTR;
1016				wakeup(wpipe);
1017			}
1018
1019			/*
1020			 * don't block on non-blocking I/O
1021			 */
1022			if (fp->f_flag & FNONBLOCK) {
1023				error = EAGAIN;
1024				break;
1025			}
1026
1027			/*
1028			 * We have no more space and have something to offer,
1029			 * wake up select/poll.
1030			 */
1031			pipeselwakeup(wpipe);
1032
1033			wpipe->pipe_state |= PIPE_WANTW;
1034			error = msleep(wpipe, PIPE_MTX(wpipe),
1035			    PRIBIO | PCATCH, "pipewr", 0);
1036			if (error != 0)
1037				break;
1038			/*
1039			 * If read side wants to go away, we just issue a signal
1040			 * to ourselves.
1041			 */
1042			if (wpipe->pipe_state & PIPE_EOF) {
1043				error = EPIPE;
1044				break;
1045			}
1046		}
1047	}
1048
1049	--wpipe->pipe_busy;
1050
1051	if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) {
1052		wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
1053		wakeup(wpipe);
1054	} else if (wpipe->pipe_buffer.cnt > 0) {
1055		/*
1056		 * If we have put any characters in the buffer, we wake up
1057		 * the reader.
1058		 */
1059		if (wpipe->pipe_state & PIPE_WANTR) {
1060			wpipe->pipe_state &= ~PIPE_WANTR;
1061			wakeup(wpipe);
1062		}
1063	}
1064
1065	/*
1066	 * Don't return EPIPE if I/O was successful
1067	 */
1068	if ((wpipe->pipe_buffer.cnt == 0) &&
1069	    (uio->uio_resid == 0) &&
1070	    (error == EPIPE)) {
1071		error = 0;
1072	}
1073
1074	if (error == 0)
1075		vfs_timestamp(&wpipe->pipe_mtime);
1076
1077	/*
1078	 * We have something to offer,
1079	 * wake up select/poll.
1080	 */
1081	if (wpipe->pipe_buffer.cnt)
1082		pipeselwakeup(wpipe);
1083
1084	PIPE_UNLOCK(wpipe);
1085	return (error);
1086}
1087
1088/*
1089 * we implement a very minimal set of ioctls for compatibility with sockets.
1090 */
1091int
1092pipe_ioctl(fp, cmd, data, td)
1093	struct file *fp;
1094	u_long cmd;
1095	caddr_t data;
1096	struct thread *td;
1097{
1098	struct pipe *mpipe = (struct pipe *)fp->f_data;
1099
1100	switch (cmd) {
1101
1102	case FIONBIO:
1103		return (0);
1104
1105	case FIOASYNC:
1106		PIPE_LOCK(mpipe);
1107		if (*(int *)data) {
1108			mpipe->pipe_state |= PIPE_ASYNC;
1109		} else {
1110			mpipe->pipe_state &= ~PIPE_ASYNC;
1111		}
1112		PIPE_UNLOCK(mpipe);
1113		return (0);
1114
1115	case FIONREAD:
1116		PIPE_LOCK(mpipe);
1117		if (mpipe->pipe_state & PIPE_DIRECTW)
1118			*(int *)data = mpipe->pipe_map.cnt;
1119		else
1120			*(int *)data = mpipe->pipe_buffer.cnt;
1121		PIPE_UNLOCK(mpipe);
1122		return (0);
1123
1124	case FIOSETOWN:
1125		return (fsetown(*(int *)data, &mpipe->pipe_sigio));
1126
1127	case FIOGETOWN:
1128		*(int *)data = fgetown(mpipe->pipe_sigio);
1129		return (0);
1130
1131	/* This is deprecated, FIOSETOWN should be used instead. */
1132	case TIOCSPGRP:
1133		return (fsetown(-(*(int *)data), &mpipe->pipe_sigio));
1134
1135	/* This is deprecated, FIOGETOWN should be used instead. */
1136	case TIOCGPGRP:
1137		*(int *)data = -fgetown(mpipe->pipe_sigio);
1138		return (0);
1139
1140	}
1141	return (ENOTTY);
1142}
1143
1144int
1145pipe_poll(fp, events, cred, td)
1146	struct file *fp;
1147	int events;
1148	struct ucred *cred;
1149	struct thread *td;
1150{
1151	struct pipe *rpipe = (struct pipe *)fp->f_data;
1152	struct pipe *wpipe;
1153	int revents = 0;
1154
1155	wpipe = rpipe->pipe_peer;
1156	PIPE_LOCK(rpipe);
1157	if (events & (POLLIN | POLLRDNORM))
1158		if ((rpipe->pipe_state & PIPE_DIRECTW) ||
1159		    (rpipe->pipe_buffer.cnt > 0) ||
1160		    (rpipe->pipe_state & PIPE_EOF))
1161			revents |= events & (POLLIN | POLLRDNORM);
1162
1163	if (events & (POLLOUT | POLLWRNORM))
1164		if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF) ||
1165		    (((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
1166		     (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF))
1167			revents |= events & (POLLOUT | POLLWRNORM);
1168
1169	if ((rpipe->pipe_state & PIPE_EOF) ||
1170	    (wpipe == NULL) ||
1171	    (wpipe->pipe_state & PIPE_EOF))
1172		revents |= POLLHUP;
1173
1174	if (revents == 0) {
1175		if (events & (POLLIN | POLLRDNORM)) {
1176			selrecord(td, &rpipe->pipe_sel);
1177			rpipe->pipe_state |= PIPE_SEL;
1178		}
1179
1180		if (events & (POLLOUT | POLLWRNORM)) {
1181			selrecord(td, &wpipe->pipe_sel);
1182			wpipe->pipe_state |= PIPE_SEL;
1183		}
1184	}
1185	PIPE_UNLOCK(rpipe);
1186
1187	return (revents);
1188}
1189
1190static int
1191pipe_stat(fp, ub, td)
1192	struct file *fp;
1193	struct stat *ub;
1194	struct thread *td;
1195{
1196	struct pipe *pipe = (struct pipe *)fp->f_data;
1197
1198	bzero((caddr_t)ub, sizeof(*ub));
1199	ub->st_mode = S_IFIFO;
1200	ub->st_blksize = pipe->pipe_buffer.size;
1201	ub->st_size = pipe->pipe_buffer.cnt;
1202	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1203	ub->st_atimespec = pipe->pipe_atime;
1204	ub->st_mtimespec = pipe->pipe_mtime;
1205	ub->st_ctimespec = pipe->pipe_ctime;
1206	ub->st_uid = fp->f_cred->cr_uid;
1207	ub->st_gid = fp->f_cred->cr_gid;
1208	/*
1209	 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen.
1210	 * XXX (st_dev, st_ino) should be unique.
1211	 */
1212	return (0);
1213}
1214
1215/* ARGSUSED */
1216static int
1217pipe_close(fp, td)
1218	struct file *fp;
1219	struct thread *td;
1220{
1221	struct pipe *cpipe = (struct pipe *)fp->f_data;
1222
1223	fp->f_ops = &badfileops;
1224	fp->f_data = NULL;
1225	funsetown(cpipe->pipe_sigio);
1226	pipeclose(cpipe);
1227	return (0);
1228}
1229
1230static void
1231pipe_free_kmem(cpipe)
1232	struct pipe *cpipe;
1233{
1234	GIANT_REQUIRED;
1235
1236	if (cpipe->pipe_buffer.buffer != NULL) {
1237		if (cpipe->pipe_buffer.size > PIPE_SIZE)
1238			--nbigpipe;
1239		amountpipekva -= cpipe->pipe_buffer.size;
1240		kmem_free(kernel_map,
1241			(vm_offset_t)cpipe->pipe_buffer.buffer,
1242			cpipe->pipe_buffer.size);
1243		cpipe->pipe_buffer.buffer = NULL;
1244	}
1245#ifndef PIPE_NODIRECT
1246	if (cpipe->pipe_map.kva != NULL) {
1247		amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE;
1248		kmem_free(kernel_map,
1249			cpipe->pipe_map.kva,
1250			cpipe->pipe_buffer.size + PAGE_SIZE);
1251		cpipe->pipe_map.cnt = 0;
1252		cpipe->pipe_map.kva = 0;
1253		cpipe->pipe_map.pos = 0;
1254		cpipe->pipe_map.npages = 0;
1255	}
1256#endif
1257}
1258
1259/*
1260 * shutdown the pipe
1261 */
1262static void
1263pipeclose(cpipe)
1264	struct pipe *cpipe;
1265{
1266	struct pipe *ppipe;
1267
1268	if (cpipe) {
1269		PIPE_LOCK(cpipe);
1270
1271		pipeselwakeup(cpipe);
1272
1273		/*
1274		 * If the other side is blocked, wake it up saying that
1275		 * we want to close it down.
1276		 */
1277		while (cpipe->pipe_busy) {
1278			wakeup(cpipe);
1279			cpipe->pipe_state |= PIPE_WANT | PIPE_EOF;
1280			msleep(cpipe, PIPE_MTX(cpipe), PRIBIO, "pipecl", 0);
1281		}
1282
1283		/*
1284		 * Disconnect from peer
1285		 */
1286		if ((ppipe = cpipe->pipe_peer) != NULL) {
1287			pipeselwakeup(ppipe);
1288
1289			ppipe->pipe_state |= PIPE_EOF;
1290			wakeup(ppipe);
1291			KNOTE(&ppipe->pipe_sel.si_note, 0);
1292			ppipe->pipe_peer = NULL;
1293		}
1294		/*
1295		 * free resources
1296		 */
1297		PIPE_UNLOCK(cpipe);
1298		mtx_lock(&Giant);
1299		pipe_free_kmem(cpipe);
1300		zfree(pipe_zone, cpipe);
1301		mtx_unlock(&Giant);
1302	}
1303}
1304
1305/*ARGSUSED*/
1306static int
1307pipe_kqfilter(struct file *fp, struct knote *kn)
1308{
1309	struct pipe *cpipe;
1310
1311	cpipe = (struct pipe *)kn->kn_fp->f_data;
1312	switch (kn->kn_filter) {
1313	case EVFILT_READ:
1314		kn->kn_fop = &pipe_rfiltops;
1315		break;
1316	case EVFILT_WRITE:
1317		kn->kn_fop = &pipe_wfiltops;
1318		cpipe = cpipe->pipe_peer;
1319		break;
1320	default:
1321		return (1);
1322	}
1323	kn->kn_hook = (caddr_t)cpipe;
1324
1325	PIPE_LOCK(cpipe);
1326	SLIST_INSERT_HEAD(&cpipe->pipe_sel.si_note, kn, kn_selnext);
1327	PIPE_UNLOCK(cpipe);
1328	return (0);
1329}
1330
1331static void
1332filt_pipedetach(struct knote *kn)
1333{
1334	struct pipe *cpipe = (struct pipe *)kn->kn_hook;
1335
1336	PIPE_LOCK(cpipe);
1337	SLIST_REMOVE(&cpipe->pipe_sel.si_note, kn, knote, kn_selnext);
1338	PIPE_UNLOCK(cpipe);
1339}
1340
1341/*ARGSUSED*/
1342static int
1343filt_piperead(struct knote *kn, long hint)
1344{
1345	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1346	struct pipe *wpipe = rpipe->pipe_peer;
1347
1348	PIPE_LOCK(rpipe);
1349	kn->kn_data = rpipe->pipe_buffer.cnt;
1350	if ((kn->kn_data == 0) && (rpipe->pipe_state & PIPE_DIRECTW))
1351		kn->kn_data = rpipe->pipe_map.cnt;
1352
1353	if ((rpipe->pipe_state & PIPE_EOF) ||
1354	    (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1355		kn->kn_flags |= EV_EOF;
1356		PIPE_UNLOCK(rpipe);
1357		return (1);
1358	}
1359	PIPE_UNLOCK(rpipe);
1360	return (kn->kn_data > 0);
1361}
1362
1363/*ARGSUSED*/
1364static int
1365filt_pipewrite(struct knote *kn, long hint)
1366{
1367	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1368	struct pipe *wpipe = rpipe->pipe_peer;
1369
1370	PIPE_LOCK(rpipe);
1371	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1372		kn->kn_data = 0;
1373		kn->kn_flags |= EV_EOF;
1374		PIPE_UNLOCK(rpipe);
1375		return (1);
1376	}
1377	kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
1378	if (wpipe->pipe_state & PIPE_DIRECTW)
1379		kn->kn_data = 0;
1380
1381	PIPE_UNLOCK(rpipe);
1382	return (kn->kn_data >= PIPE_BUF);
1383}
1384