sys_pipe.c revision 79225
1/*
2 * Copyright (c) 1996 John S. Dyson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice immediately at the beginning of the file, without modification,
10 *    this list of conditions, and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 * 3. Absolutely no warranty of function or purpose is made by the author
15 *    John S. Dyson.
16 * 4. Modifications may be freely made to this file if the above conditions
17 *    are met.
18 *
19 * $FreeBSD: head/sys/kern/sys_pipe.c 79225 2001-07-04 17:11:03Z dillon $
20 */
21
22/*
23 * This file contains a high-performance replacement for the socket-based
24 * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
25 * all features of sockets, but does do everything that pipes normally
26 * do.
27 */
28
29/*
30 * This code has two modes of operation, a small write mode and a large
31 * write mode.  The small write mode acts like conventional pipes with
32 * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
33 * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
34 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
35 * the receiving process can copy it directly from the pages in the sending
36 * process.
37 *
38 * If the sending process receives a signal, it is possible that it will
39 * go away, and certainly its address space can change, because control
40 * is returned back to the user-mode side.  In that case, the pipe code
41 * arranges to copy the buffer supplied by the user process, to a pageable
42 * kernel buffer, and the receiving process will grab the data from the
43 * pageable kernel buffer.  Since signals don't happen all that often,
44 * the copy operation is normally eliminated.
45 *
46 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
47 * happen for small transfers so that the system will not spend all of
48 * its time context switching.  PIPE_SIZE is constrained by the
49 * amount of kernel virtual memory.
50 */
51
52#include <sys/param.h>
53#include <sys/systm.h>
54#include <sys/fcntl.h>
55#include <sys/file.h>
56#include <sys/filedesc.h>
57#include <sys/filio.h>
58#include <sys/lock.h>
59#include <sys/mutex.h>
60#include <sys/ttycom.h>
61#include <sys/stat.h>
62#include <sys/poll.h>
63#include <sys/selinfo.h>
64#include <sys/signalvar.h>
65#include <sys/sysproto.h>
66#include <sys/pipe.h>
67#include <sys/proc.h>
68#include <sys/vnode.h>
69#include <sys/uio.h>
70#include <sys/event.h>
71
72#include <vm/vm.h>
73#include <vm/vm_param.h>
74#include <vm/vm_object.h>
75#include <vm/vm_kern.h>
76#include <vm/vm_extern.h>
77#include <vm/pmap.h>
78#include <vm/vm_map.h>
79#include <vm/vm_page.h>
80#include <vm/vm_zone.h>
81
82/*
83 * Use this define if you want to disable *fancy* VM things.  Expect an
84 * approx 30% decrease in transfer rate.  This could be useful for
85 * NetBSD or OpenBSD.
86 */
87/* #define PIPE_NODIRECT */
88
89/*
90 * interfaces to the outside world
91 */
92static int pipe_read __P((struct file *fp, struct uio *uio,
93		struct ucred *cred, int flags, struct proc *p));
94static int pipe_write __P((struct file *fp, struct uio *uio,
95		struct ucred *cred, int flags, struct proc *p));
96static int pipe_close __P((struct file *fp, struct proc *p));
97static int pipe_poll __P((struct file *fp, int events, struct ucred *cred,
98		struct proc *p));
99static int pipe_kqfilter __P((struct file *fp, struct knote *kn));
100static int pipe_stat __P((struct file *fp, struct stat *sb, struct proc *p));
101static int pipe_ioctl __P((struct file *fp, u_long cmd, caddr_t data, struct proc *p));
102
103static struct fileops pipeops = {
104	pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter,
105	pipe_stat, pipe_close
106};
107
108static void	filt_pipedetach(struct knote *kn);
109static int	filt_piperead(struct knote *kn, long hint);
110static int	filt_pipewrite(struct knote *kn, long hint);
111
112static struct filterops pipe_rfiltops =
113	{ 1, NULL, filt_pipedetach, filt_piperead };
114static struct filterops pipe_wfiltops =
115	{ 1, NULL, filt_pipedetach, filt_pipewrite };
116
117
118/*
119 * Default pipe buffer size(s), this can be kind-of large now because pipe
120 * space is pageable.  The pipe code will try to maintain locality of
121 * reference for performance reasons, so small amounts of outstanding I/O
122 * will not wipe the cache.
123 */
124#define MINPIPESIZE (PIPE_SIZE/3)
125#define MAXPIPESIZE (2*PIPE_SIZE/3)
126
127/*
128 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
129 * is there so that on large systems, we don't exhaust it.
130 */
131#define MAXPIPEKVA (8*1024*1024)
132
133/*
134 * Limit for direct transfers, we cannot, of course limit
135 * the amount of kva for pipes in general though.
136 */
137#define LIMITPIPEKVA (16*1024*1024)
138
139/*
140 * Limit the number of "big" pipes
141 */
142#define LIMITBIGPIPES	32
143static int nbigpipe;
144
145static int amountpipekva;
146
147static void pipeclose __P((struct pipe *cpipe));
148static void pipe_free_kmem __P((struct pipe *cpipe));
149static int pipe_create __P((struct pipe **cpipep));
150static __inline int pipelock __P((struct pipe *cpipe, int catch));
151static __inline void pipeunlock __P((struct pipe *cpipe));
152static __inline void pipeselwakeup __P((struct pipe *cpipe));
153#ifndef PIPE_NODIRECT
154static int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio));
155static void pipe_destroy_write_buffer __P((struct pipe *wpipe));
156static int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio));
157static void pipe_clone_write_buffer __P((struct pipe *wpipe));
158#endif
159static int pipespace __P((struct pipe *cpipe, int size));
160
161static vm_zone_t pipe_zone;
162
163/*
164 * The pipe system call for the DTYPE_PIPE type of pipes
165 */
166
167/* ARGSUSED */
168int
169pipe(p, uap)
170	struct proc *p;
171	struct pipe_args /* {
172		int	dummy;
173	} */ *uap;
174{
175	struct filedesc *fdp = p->p_fd;
176	struct file *rf, *wf;
177	struct pipe *rpipe, *wpipe;
178	int fd, error;
179
180	if (pipe_zone == NULL)
181		pipe_zone = zinit("PIPE", sizeof(struct pipe), 0, 0, 4);
182
183	rpipe = wpipe = NULL;
184	if (pipe_create(&rpipe) || pipe_create(&wpipe)) {
185		pipeclose(rpipe);
186		pipeclose(wpipe);
187		return (ENFILE);
188	}
189
190	rpipe->pipe_state |= PIPE_DIRECTOK;
191	wpipe->pipe_state |= PIPE_DIRECTOK;
192
193	error = falloc(p, &rf, &fd);
194	if (error) {
195		pipeclose(rpipe);
196		pipeclose(wpipe);
197		return (error);
198	}
199	fhold(rf);
200	p->p_retval[0] = fd;
201
202	/*
203	 * Warning: once we've gotten past allocation of the fd for the
204	 * read-side, we can only drop the read side via fdrop() in order
205	 * to avoid races against processes which manage to dup() the read
206	 * side while we are blocked trying to allocate the write side.
207	 */
208	rf->f_flag = FREAD | FWRITE;
209	rf->f_type = DTYPE_PIPE;
210	rf->f_data = (caddr_t)rpipe;
211	rf->f_ops = &pipeops;
212	error = falloc(p, &wf, &fd);
213	if (error) {
214		if (fdp->fd_ofiles[p->p_retval[0]] == rf) {
215			fdp->fd_ofiles[p->p_retval[0]] = NULL;
216			fdrop(rf, p);
217		}
218		fdrop(rf, p);
219		/* rpipe has been closed by fdrop(). */
220		pipeclose(wpipe);
221		return (error);
222	}
223	wf->f_flag = FREAD | FWRITE;
224	wf->f_type = DTYPE_PIPE;
225	wf->f_data = (caddr_t)wpipe;
226	wf->f_ops = &pipeops;
227	p->p_retval[1] = fd;
228
229	rpipe->pipe_peer = wpipe;
230	wpipe->pipe_peer = rpipe;
231	fdrop(rf, p);
232
233	return (0);
234}
235
236/*
237 * Allocate kva for pipe circular buffer, the space is pageable
238 * This routine will 'realloc' the size of a pipe safely, if it fails
239 * it will retain the old buffer.
240 * If it fails it will return ENOMEM.
241 */
242static int
243pipespace(cpipe, size)
244	struct pipe *cpipe;
245	int size;
246{
247	struct vm_object *object;
248	caddr_t buffer;
249	int npages, error;
250
251	GIANT_REQUIRED;
252
253	npages = round_page(size)/PAGE_SIZE;
254	/*
255	 * Create an object, I don't like the idea of paging to/from
256	 * kernel_object.
257	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
258	 */
259	object = vm_object_allocate(OBJT_DEFAULT, npages);
260	buffer = (caddr_t) vm_map_min(kernel_map);
261
262	/*
263	 * Insert the object into the kernel map, and allocate kva for it.
264	 * The map entry is, by default, pageable.
265	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
266	 */
267	error = vm_map_find(kernel_map, object, 0,
268		(vm_offset_t *) &buffer, size, 1,
269		VM_PROT_ALL, VM_PROT_ALL, 0);
270
271	if (error != KERN_SUCCESS) {
272		vm_object_deallocate(object);
273		return (ENOMEM);
274	}
275
276	/* free old resources if we're resizing */
277	pipe_free_kmem(cpipe);
278	cpipe->pipe_buffer.object = object;
279	cpipe->pipe_buffer.buffer = buffer;
280	cpipe->pipe_buffer.size = size;
281	cpipe->pipe_buffer.in = 0;
282	cpipe->pipe_buffer.out = 0;
283	cpipe->pipe_buffer.cnt = 0;
284	amountpipekva += cpipe->pipe_buffer.size;
285	return (0);
286}
287
288/*
289 * initialize and allocate VM and memory for pipe
290 */
291static int
292pipe_create(cpipep)
293	struct pipe **cpipep;
294{
295	struct pipe *cpipe;
296	int error;
297
298	*cpipep = zalloc(pipe_zone);
299	if (*cpipep == NULL)
300		return (ENOMEM);
301
302	cpipe = *cpipep;
303
304	/* so pipespace()->pipe_free_kmem() doesn't follow junk pointer */
305	cpipe->pipe_buffer.object = NULL;
306#ifndef PIPE_NODIRECT
307	cpipe->pipe_map.kva = NULL;
308#endif
309	/*
310	 * protect so pipeclose() doesn't follow a junk pointer
311	 * if pipespace() fails.
312	 */
313	bzero(&cpipe->pipe_sel, sizeof(cpipe->pipe_sel));
314	cpipe->pipe_state = 0;
315	cpipe->pipe_peer = NULL;
316	cpipe->pipe_busy = 0;
317
318#ifndef PIPE_NODIRECT
319	/*
320	 * pipe data structure initializations to support direct pipe I/O
321	 */
322	cpipe->pipe_map.cnt = 0;
323	cpipe->pipe_map.kva = 0;
324	cpipe->pipe_map.pos = 0;
325	cpipe->pipe_map.npages = 0;
326	/* cpipe->pipe_map.ms[] = invalid */
327#endif
328
329	error = pipespace(cpipe, PIPE_SIZE);
330	if (error)
331		return (error);
332
333	vfs_timestamp(&cpipe->pipe_ctime);
334	cpipe->pipe_atime = cpipe->pipe_ctime;
335	cpipe->pipe_mtime = cpipe->pipe_ctime;
336
337	return (0);
338}
339
340
341/*
342 * lock a pipe for I/O, blocking other access
343 */
344static __inline int
345pipelock(cpipe, catch)
346	struct pipe *cpipe;
347	int catch;
348{
349	int error;
350
351	while (cpipe->pipe_state & PIPE_LOCK) {
352		cpipe->pipe_state |= PIPE_LWANT;
353		error = tsleep(cpipe, catch ? (PRIBIO | PCATCH) : PRIBIO,
354		    "pipelk", 0);
355		if (error != 0)
356			return (error);
357	}
358	cpipe->pipe_state |= PIPE_LOCK;
359	return (0);
360}
361
362/*
363 * unlock a pipe I/O lock
364 */
365static __inline void
366pipeunlock(cpipe)
367	struct pipe *cpipe;
368{
369
370	cpipe->pipe_state &= ~PIPE_LOCK;
371	if (cpipe->pipe_state & PIPE_LWANT) {
372		cpipe->pipe_state &= ~PIPE_LWANT;
373		wakeup(cpipe);
374	}
375}
376
377static __inline void
378pipeselwakeup(cpipe)
379	struct pipe *cpipe;
380{
381
382	if (cpipe->pipe_state & PIPE_SEL) {
383		cpipe->pipe_state &= ~PIPE_SEL;
384		selwakeup(&cpipe->pipe_sel);
385	}
386	if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio)
387		pgsigio(cpipe->pipe_sigio, SIGIO, 0);
388	KNOTE(&cpipe->pipe_sel.si_note, 0);
389}
390
391/* ARGSUSED */
392static int
393pipe_read(fp, uio, cred, flags, p)
394	struct file *fp;
395	struct uio *uio;
396	struct ucred *cred;
397	struct proc *p;
398	int flags;
399{
400	struct pipe *rpipe = (struct pipe *) fp->f_data;
401	int error;
402	int nread = 0;
403	u_int size;
404
405	++rpipe->pipe_busy;
406	error = pipelock(rpipe, 1);
407	if (error)
408		goto unlocked_error;
409
410	while (uio->uio_resid) {
411		/*
412		 * normal pipe buffer receive
413		 */
414		if (rpipe->pipe_buffer.cnt > 0) {
415			size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
416			if (size > rpipe->pipe_buffer.cnt)
417				size = rpipe->pipe_buffer.cnt;
418			if (size > (u_int) uio->uio_resid)
419				size = (u_int) uio->uio_resid;
420
421			error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
422					size, uio);
423			if (error)
424				break;
425
426			rpipe->pipe_buffer.out += size;
427			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
428				rpipe->pipe_buffer.out = 0;
429
430			rpipe->pipe_buffer.cnt -= size;
431
432			/*
433			 * If there is no more to read in the pipe, reset
434			 * its pointers to the beginning.  This improves
435			 * cache hit stats.
436			 */
437			if (rpipe->pipe_buffer.cnt == 0) {
438				rpipe->pipe_buffer.in = 0;
439				rpipe->pipe_buffer.out = 0;
440			}
441			nread += size;
442#ifndef PIPE_NODIRECT
443		/*
444		 * Direct copy, bypassing a kernel buffer.
445		 */
446		} else if ((size = rpipe->pipe_map.cnt) &&
447			   (rpipe->pipe_state & PIPE_DIRECTW)) {
448			caddr_t	va;
449			if (size > (u_int) uio->uio_resid)
450				size = (u_int) uio->uio_resid;
451
452			va = (caddr_t) rpipe->pipe_map.kva +
453			    rpipe->pipe_map.pos;
454			error = uiomove(va, size, uio);
455			if (error)
456				break;
457			nread += size;
458			rpipe->pipe_map.pos += size;
459			rpipe->pipe_map.cnt -= size;
460			if (rpipe->pipe_map.cnt == 0) {
461				rpipe->pipe_state &= ~PIPE_DIRECTW;
462				wakeup(rpipe);
463			}
464#endif
465		} else {
466			/*
467			 * detect EOF condition
468			 * read returns 0 on EOF, no need to set error
469			 */
470			if (rpipe->pipe_state & PIPE_EOF)
471				break;
472
473			/*
474			 * If the "write-side" has been blocked, wake it up now.
475			 */
476			if (rpipe->pipe_state & PIPE_WANTW) {
477				rpipe->pipe_state &= ~PIPE_WANTW;
478				wakeup(rpipe);
479			}
480
481			/*
482			 * Break if some data was read.
483			 */
484			if (nread > 0)
485				break;
486
487			/*
488			 * Unlock the pipe buffer for our remaining processing.  We
489			 * will either break out with an error or we will sleep and
490			 * relock to loop.
491			 */
492			pipeunlock(rpipe);
493
494			/*
495			 * Handle non-blocking mode operation or
496			 * wait for more data.
497			 */
498			if (fp->f_flag & FNONBLOCK) {
499				error = EAGAIN;
500			} else {
501				rpipe->pipe_state |= PIPE_WANTR;
502				if ((error = tsleep(rpipe, PRIBIO | PCATCH,
503				    "piperd", 0)) == 0)
504					error = pipelock(rpipe, 1);
505			}
506			if (error)
507				goto unlocked_error;
508		}
509	}
510	pipeunlock(rpipe);
511
512	if (error == 0)
513		vfs_timestamp(&rpipe->pipe_atime);
514unlocked_error:
515	--rpipe->pipe_busy;
516
517	/*
518	 * PIPE_WANT processing only makes sense if pipe_busy is 0.
519	 */
520	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
521		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
522		wakeup(rpipe);
523	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
524		/*
525		 * Handle write blocking hysteresis.
526		 */
527		if (rpipe->pipe_state & PIPE_WANTW) {
528			rpipe->pipe_state &= ~PIPE_WANTW;
529			wakeup(rpipe);
530		}
531	}
532
533	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
534		pipeselwakeup(rpipe);
535
536	return (error);
537}
538
539#ifndef PIPE_NODIRECT
540/*
541 * Map the sending processes' buffer into kernel space and wire it.
542 * This is similar to a physical write operation.
543 */
544static int
545pipe_build_write_buffer(wpipe, uio)
546	struct pipe *wpipe;
547	struct uio *uio;
548{
549	u_int size;
550	int i;
551	vm_offset_t addr, endaddr, paddr;
552
553	GIANT_REQUIRED;
554
555	size = (u_int) uio->uio_iov->iov_len;
556	if (size > wpipe->pipe_buffer.size)
557		size = wpipe->pipe_buffer.size;
558
559	endaddr = round_page((vm_offset_t)uio->uio_iov->iov_base + size);
560	addr = trunc_page((vm_offset_t)uio->uio_iov->iov_base);
561	for (i = 0; addr < endaddr; addr += PAGE_SIZE, i++) {
562		vm_page_t m;
563
564		if (vm_fault_quick((caddr_t)addr, VM_PROT_READ) < 0 ||
565		    (paddr = pmap_kextract(addr)) == 0) {
566			int j;
567
568			for (j = 0; j < i; j++)
569				vm_page_unwire(wpipe->pipe_map.ms[j], 1);
570			return (EFAULT);
571		}
572
573		m = PHYS_TO_VM_PAGE(paddr);
574		vm_page_wire(m);
575		wpipe->pipe_map.ms[i] = m;
576	}
577
578/*
579 * set up the control block
580 */
581	wpipe->pipe_map.npages = i;
582	wpipe->pipe_map.pos =
583	    ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
584	wpipe->pipe_map.cnt = size;
585
586/*
587 * and map the buffer
588 */
589	if (wpipe->pipe_map.kva == 0) {
590		/*
591		 * We need to allocate space for an extra page because the
592		 * address range might (will) span pages at times.
593		 */
594		wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
595			wpipe->pipe_buffer.size + PAGE_SIZE);
596		amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE;
597	}
598	pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
599		wpipe->pipe_map.npages);
600
601/*
602 * and update the uio data
603 */
604
605	uio->uio_iov->iov_len -= size;
606	uio->uio_iov->iov_base += size;
607	if (uio->uio_iov->iov_len == 0)
608		uio->uio_iov++;
609	uio->uio_resid -= size;
610	uio->uio_offset += size;
611	return (0);
612}
613
614/*
615 * unmap and unwire the process buffer
616 */
617static void
618pipe_destroy_write_buffer(wpipe)
619	struct pipe *wpipe;
620{
621	int i;
622
623	GIANT_REQUIRED;
624
625	if (wpipe->pipe_map.kva) {
626		pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
627
628		if (amountpipekva > MAXPIPEKVA) {
629			vm_offset_t kva = wpipe->pipe_map.kva;
630			wpipe->pipe_map.kva = 0;
631			kmem_free(kernel_map, kva,
632				wpipe->pipe_buffer.size + PAGE_SIZE);
633			amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
634		}
635	}
636	for (i = 0; i < wpipe->pipe_map.npages; i++)
637		vm_page_unwire(wpipe->pipe_map.ms[i], 1);
638}
639
640/*
641 * In the case of a signal, the writing process might go away.  This
642 * code copies the data into the circular buffer so that the source
643 * pages can be freed without loss of data.
644 */
645static void
646pipe_clone_write_buffer(wpipe)
647	struct pipe *wpipe;
648{
649	int size;
650	int pos;
651
652	size = wpipe->pipe_map.cnt;
653	pos = wpipe->pipe_map.pos;
654	bcopy((caddr_t) wpipe->pipe_map.kva + pos,
655	    (caddr_t) wpipe->pipe_buffer.buffer, size);
656
657	wpipe->pipe_buffer.in = size;
658	wpipe->pipe_buffer.out = 0;
659	wpipe->pipe_buffer.cnt = size;
660	wpipe->pipe_state &= ~PIPE_DIRECTW;
661
662	pipe_destroy_write_buffer(wpipe);
663}
664
665/*
666 * This implements the pipe buffer write mechanism.  Note that only
667 * a direct write OR a normal pipe write can be pending at any given time.
668 * If there are any characters in the pipe buffer, the direct write will
669 * be deferred until the receiving process grabs all of the bytes from
670 * the pipe buffer.  Then the direct mapping write is set-up.
671 */
672static int
673pipe_direct_write(wpipe, uio)
674	struct pipe *wpipe;
675	struct uio *uio;
676{
677	int error;
678
679retry:
680	while (wpipe->pipe_state & PIPE_DIRECTW) {
681		if (wpipe->pipe_state & PIPE_WANTR) {
682			wpipe->pipe_state &= ~PIPE_WANTR;
683			wakeup(wpipe);
684		}
685		wpipe->pipe_state |= PIPE_WANTW;
686		error = tsleep(wpipe, PRIBIO | PCATCH, "pipdww", 0);
687		if (error)
688			goto error1;
689		if (wpipe->pipe_state & PIPE_EOF) {
690			error = EPIPE;
691			goto error1;
692		}
693	}
694	wpipe->pipe_map.cnt = 0;	/* transfer not ready yet */
695	if (wpipe->pipe_buffer.cnt > 0) {
696		if (wpipe->pipe_state & PIPE_WANTR) {
697			wpipe->pipe_state &= ~PIPE_WANTR;
698			wakeup(wpipe);
699		}
700
701		wpipe->pipe_state |= PIPE_WANTW;
702		error = tsleep(wpipe, PRIBIO | PCATCH, "pipdwc", 0);
703		if (error)
704			goto error1;
705		if (wpipe->pipe_state & PIPE_EOF) {
706			error = EPIPE;
707			goto error1;
708		}
709		goto retry;
710	}
711
712	wpipe->pipe_state |= PIPE_DIRECTW;
713
714	error = pipe_build_write_buffer(wpipe, uio);
715	if (error) {
716		wpipe->pipe_state &= ~PIPE_DIRECTW;
717		goto error1;
718	}
719
720	error = 0;
721	while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
722		if (wpipe->pipe_state & PIPE_EOF) {
723			pipelock(wpipe, 0);
724			pipe_destroy_write_buffer(wpipe);
725			pipeunlock(wpipe);
726			pipeselwakeup(wpipe);
727			error = EPIPE;
728			goto error1;
729		}
730		if (wpipe->pipe_state & PIPE_WANTR) {
731			wpipe->pipe_state &= ~PIPE_WANTR;
732			wakeup(wpipe);
733		}
734		pipeselwakeup(wpipe);
735		error = tsleep(wpipe, PRIBIO | PCATCH, "pipdwt", 0);
736	}
737
738	pipelock(wpipe,0);
739	if (wpipe->pipe_state & PIPE_DIRECTW) {
740		/*
741		 * this bit of trickery substitutes a kernel buffer for
742		 * the process that might be going away.
743		 */
744		pipe_clone_write_buffer(wpipe);
745	} else {
746		pipe_destroy_write_buffer(wpipe);
747	}
748	pipeunlock(wpipe);
749	return (error);
750
751error1:
752	wakeup(wpipe);
753	return (error);
754}
755#endif
756
757static int
758pipe_write(fp, uio, cred, flags, p)
759	struct file *fp;
760	struct uio *uio;
761	struct ucred *cred;
762	struct proc *p;
763	int flags;
764{
765	int error = 0;
766	int orig_resid;
767	struct pipe *wpipe, *rpipe;
768
769	rpipe = (struct pipe *) fp->f_data;
770	wpipe = rpipe->pipe_peer;
771
772	/*
773	 * detect loss of pipe read side, issue SIGPIPE if lost.
774	 */
775	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
776		return (EPIPE);
777	}
778	++wpipe->pipe_busy;
779
780	/*
781	 * If it is advantageous to resize the pipe buffer, do
782	 * so.
783	 */
784	if ((uio->uio_resid > PIPE_SIZE) &&
785		(nbigpipe < LIMITBIGPIPES) &&
786		(wpipe->pipe_state & PIPE_DIRECTW) == 0 &&
787		(wpipe->pipe_buffer.size <= PIPE_SIZE) &&
788		(wpipe->pipe_buffer.cnt == 0)) {
789
790		if ((error = pipelock(wpipe,1)) == 0) {
791			if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
792				nbigpipe++;
793			pipeunlock(wpipe);
794		}
795	}
796
797	/*
798	 * If an early error occured unbusy and return, waking up any pending
799	 * readers.
800	 */
801	if (error) {
802		--wpipe->pipe_busy;
803		if ((wpipe->pipe_busy == 0) &&
804		    (wpipe->pipe_state & PIPE_WANT)) {
805			wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
806			wakeup(wpipe);
807		}
808		return(error);
809	}
810
811	KASSERT(wpipe->pipe_buffer.buffer != NULL, ("pipe buffer gone"));
812
813	orig_resid = uio->uio_resid;
814
815	while (uio->uio_resid) {
816		int space;
817
818#ifndef PIPE_NODIRECT
819		/*
820		 * If the transfer is large, we can gain performance if
821		 * we do process-to-process copies directly.
822		 * If the write is non-blocking, we don't use the
823		 * direct write mechanism.
824		 *
825		 * The direct write mechanism will detect the reader going
826		 * away on us.
827		 */
828		if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) &&
829		    (fp->f_flag & FNONBLOCK) == 0 &&
830			(wpipe->pipe_map.kva || (amountpipekva < LIMITPIPEKVA)) &&
831			(uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
832			error = pipe_direct_write( wpipe, uio);
833			if (error)
834				break;
835			continue;
836		}
837#endif
838
839		/*
840		 * Pipe buffered writes cannot be coincidental with
841		 * direct writes.  We wait until the currently executing
842		 * direct write is completed before we start filling the
843		 * pipe buffer.  We break out if a signal occurs or the
844		 * reader goes away.
845		 */
846	retrywrite:
847		while (wpipe->pipe_state & PIPE_DIRECTW) {
848			if (wpipe->pipe_state & PIPE_WANTR) {
849				wpipe->pipe_state &= ~PIPE_WANTR;
850				wakeup(wpipe);
851			}
852			error = tsleep(wpipe, PRIBIO | PCATCH, "pipbww", 0);
853			if (wpipe->pipe_state & PIPE_EOF)
854				break;
855			if (error)
856				break;
857		}
858		if (wpipe->pipe_state & PIPE_EOF) {
859			error = EPIPE;
860			break;
861		}
862
863		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
864
865		/* Writes of size <= PIPE_BUF must be atomic. */
866		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
867			space = 0;
868
869		if (space > 0 && (wpipe->pipe_buffer.cnt < PIPE_SIZE)) {
870			if ((error = pipelock(wpipe,1)) == 0) {
871				int size;	/* Transfer size */
872				int segsize;	/* first segment to transfer */
873
874				/*
875				 * It is possible for a direct write to
876				 * slip in on us... handle it here...
877				 */
878				if (wpipe->pipe_state & PIPE_DIRECTW) {
879					pipeunlock(wpipe);
880					goto retrywrite;
881				}
882				/*
883				 * If a process blocked in uiomove, our
884				 * value for space might be bad.
885				 *
886				 * XXX will we be ok if the reader has gone
887				 * away here?
888				 */
889				if (space > wpipe->pipe_buffer.size -
890				    wpipe->pipe_buffer.cnt) {
891					pipeunlock(wpipe);
892					goto retrywrite;
893				}
894
895				/*
896				 * Transfer size is minimum of uio transfer
897				 * and free space in pipe buffer.
898				 */
899				if (space > uio->uio_resid)
900					size = uio->uio_resid;
901				else
902					size = space;
903				/*
904				 * First segment to transfer is minimum of
905				 * transfer size and contiguous space in
906				 * pipe buffer.  If first segment to transfer
907				 * is less than the transfer size, we've got
908				 * a wraparound in the buffer.
909				 */
910				segsize = wpipe->pipe_buffer.size -
911					wpipe->pipe_buffer.in;
912				if (segsize > size)
913					segsize = size;
914
915				/* Transfer first segment */
916
917				error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
918						segsize, uio);
919
920				if (error == 0 && segsize < size) {
921					/*
922					 * Transfer remaining part now, to
923					 * support atomic writes.  Wraparound
924					 * happened.
925					 */
926					if (wpipe->pipe_buffer.in + segsize !=
927					    wpipe->pipe_buffer.size)
928						panic("Expected pipe buffer wraparound disappeared");
929
930					error = uiomove(&wpipe->pipe_buffer.buffer[0],
931							size - segsize, uio);
932				}
933				if (error == 0) {
934					wpipe->pipe_buffer.in += size;
935					if (wpipe->pipe_buffer.in >=
936					    wpipe->pipe_buffer.size) {
937						if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size)
938							panic("Expected wraparound bad");
939						wpipe->pipe_buffer.in = size - segsize;
940					}
941
942					wpipe->pipe_buffer.cnt += size;
943					if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size)
944						panic("Pipe buffer overflow");
945
946				}
947				pipeunlock(wpipe);
948			}
949			if (error)
950				break;
951
952		} else {
953			/*
954			 * If the "read-side" has been blocked, wake it up now.
955			 */
956			if (wpipe->pipe_state & PIPE_WANTR) {
957				wpipe->pipe_state &= ~PIPE_WANTR;
958				wakeup(wpipe);
959			}
960
961			/*
962			 * don't block on non-blocking I/O
963			 */
964			if (fp->f_flag & FNONBLOCK) {
965				error = EAGAIN;
966				break;
967			}
968
969			/*
970			 * We have no more space and have something to offer,
971			 * wake up select/poll.
972			 */
973			pipeselwakeup(wpipe);
974
975			wpipe->pipe_state |= PIPE_WANTW;
976			error = tsleep(wpipe, PRIBIO | PCATCH, "pipewr", 0);
977			if (error != 0)
978				break;
979			/*
980			 * If read side wants to go away, we just issue a signal
981			 * to ourselves.
982			 */
983			if (wpipe->pipe_state & PIPE_EOF) {
984				error = EPIPE;
985				break;
986			}
987		}
988	}
989
990	--wpipe->pipe_busy;
991
992	if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) {
993		wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
994		wakeup(wpipe);
995	} else if (wpipe->pipe_buffer.cnt > 0) {
996		/*
997		 * If we have put any characters in the buffer, we wake up
998		 * the reader.
999		 */
1000		if (wpipe->pipe_state & PIPE_WANTR) {
1001			wpipe->pipe_state &= ~PIPE_WANTR;
1002			wakeup(wpipe);
1003		}
1004	}
1005
1006	/*
1007	 * Don't return EPIPE if I/O was successful
1008	 */
1009	if ((wpipe->pipe_buffer.cnt == 0) &&
1010	    (uio->uio_resid == 0) &&
1011	    (error == EPIPE)) {
1012		error = 0;
1013	}
1014
1015	if (error == 0)
1016		vfs_timestamp(&wpipe->pipe_mtime);
1017
1018	/*
1019	 * We have something to offer,
1020	 * wake up select/poll.
1021	 */
1022	if (wpipe->pipe_buffer.cnt)
1023		pipeselwakeup(wpipe);
1024
1025	return (error);
1026}
1027
1028/*
1029 * we implement a very minimal set of ioctls for compatibility with sockets.
1030 */
1031int
1032pipe_ioctl(fp, cmd, data, p)
1033	struct file *fp;
1034	u_long cmd;
1035	caddr_t data;
1036	struct proc *p;
1037{
1038	struct pipe *mpipe = (struct pipe *)fp->f_data;
1039
1040	switch (cmd) {
1041
1042	case FIONBIO:
1043		return (0);
1044
1045	case FIOASYNC:
1046		if (*(int *)data) {
1047			mpipe->pipe_state |= PIPE_ASYNC;
1048		} else {
1049			mpipe->pipe_state &= ~PIPE_ASYNC;
1050		}
1051		return (0);
1052
1053	case FIONREAD:
1054		if (mpipe->pipe_state & PIPE_DIRECTW)
1055			*(int *)data = mpipe->pipe_map.cnt;
1056		else
1057			*(int *)data = mpipe->pipe_buffer.cnt;
1058		return (0);
1059
1060	case FIOSETOWN:
1061		return (fsetown(*(int *)data, &mpipe->pipe_sigio));
1062
1063	case FIOGETOWN:
1064		*(int *)data = fgetown(mpipe->pipe_sigio);
1065		return (0);
1066
1067	/* This is deprecated, FIOSETOWN should be used instead. */
1068	case TIOCSPGRP:
1069		return (fsetown(-(*(int *)data), &mpipe->pipe_sigio));
1070
1071	/* This is deprecated, FIOGETOWN should be used instead. */
1072	case TIOCGPGRP:
1073		*(int *)data = -fgetown(mpipe->pipe_sigio);
1074		return (0);
1075
1076	}
1077	return (ENOTTY);
1078}
1079
1080int
1081pipe_poll(fp, events, cred, p)
1082	struct file *fp;
1083	int events;
1084	struct ucred *cred;
1085	struct proc *p;
1086{
1087	struct pipe *rpipe = (struct pipe *)fp->f_data;
1088	struct pipe *wpipe;
1089	int revents = 0;
1090
1091	wpipe = rpipe->pipe_peer;
1092	if (events & (POLLIN | POLLRDNORM))
1093		if ((rpipe->pipe_state & PIPE_DIRECTW) ||
1094		    (rpipe->pipe_buffer.cnt > 0) ||
1095		    (rpipe->pipe_state & PIPE_EOF))
1096			revents |= events & (POLLIN | POLLRDNORM);
1097
1098	if (events & (POLLOUT | POLLWRNORM))
1099		if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF) ||
1100		    (((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
1101		     (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF))
1102			revents |= events & (POLLOUT | POLLWRNORM);
1103
1104	if ((rpipe->pipe_state & PIPE_EOF) ||
1105	    (wpipe == NULL) ||
1106	    (wpipe->pipe_state & PIPE_EOF))
1107		revents |= POLLHUP;
1108
1109	if (revents == 0) {
1110		if (events & (POLLIN | POLLRDNORM)) {
1111			selrecord(p, &rpipe->pipe_sel);
1112			rpipe->pipe_state |= PIPE_SEL;
1113		}
1114
1115		if (events & (POLLOUT | POLLWRNORM)) {
1116			selrecord(p, &wpipe->pipe_sel);
1117			wpipe->pipe_state |= PIPE_SEL;
1118		}
1119	}
1120
1121	return (revents);
1122}
1123
1124static int
1125pipe_stat(fp, ub, p)
1126	struct file *fp;
1127	struct stat *ub;
1128	struct proc *p;
1129{
1130	struct pipe *pipe = (struct pipe *)fp->f_data;
1131
1132	bzero((caddr_t)ub, sizeof(*ub));
1133	ub->st_mode = S_IFIFO;
1134	ub->st_blksize = pipe->pipe_buffer.size;
1135	ub->st_size = pipe->pipe_buffer.cnt;
1136	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1137	ub->st_atimespec = pipe->pipe_atime;
1138	ub->st_mtimespec = pipe->pipe_mtime;
1139	ub->st_ctimespec = pipe->pipe_ctime;
1140	ub->st_uid = fp->f_cred->cr_uid;
1141	ub->st_gid = fp->f_cred->cr_gid;
1142	/*
1143	 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen.
1144	 * XXX (st_dev, st_ino) should be unique.
1145	 */
1146	return (0);
1147}
1148
1149/* ARGSUSED */
1150static int
1151pipe_close(fp, p)
1152	struct file *fp;
1153	struct proc *p;
1154{
1155	struct pipe *cpipe = (struct pipe *)fp->f_data;
1156
1157	fp->f_ops = &badfileops;
1158	fp->f_data = NULL;
1159	funsetown(cpipe->pipe_sigio);
1160	pipeclose(cpipe);
1161	return (0);
1162}
1163
1164static void
1165pipe_free_kmem(cpipe)
1166	struct pipe *cpipe;
1167{
1168	GIANT_REQUIRED;
1169
1170	if (cpipe->pipe_buffer.buffer != NULL) {
1171		if (cpipe->pipe_buffer.size > PIPE_SIZE)
1172			--nbigpipe;
1173		amountpipekva -= cpipe->pipe_buffer.size;
1174		kmem_free(kernel_map,
1175			(vm_offset_t)cpipe->pipe_buffer.buffer,
1176			cpipe->pipe_buffer.size);
1177		cpipe->pipe_buffer.buffer = NULL;
1178	}
1179#ifndef PIPE_NODIRECT
1180	if (cpipe->pipe_map.kva != NULL) {
1181		amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE;
1182		kmem_free(kernel_map,
1183			cpipe->pipe_map.kva,
1184			cpipe->pipe_buffer.size + PAGE_SIZE);
1185		cpipe->pipe_map.cnt = 0;
1186		cpipe->pipe_map.kva = 0;
1187		cpipe->pipe_map.pos = 0;
1188		cpipe->pipe_map.npages = 0;
1189	}
1190#endif
1191}
1192
1193/*
1194 * shutdown the pipe
1195 */
1196static void
1197pipeclose(cpipe)
1198	struct pipe *cpipe;
1199{
1200	struct pipe *ppipe;
1201
1202	if (cpipe) {
1203
1204		pipeselwakeup(cpipe);
1205
1206		/*
1207		 * If the other side is blocked, wake it up saying that
1208		 * we want to close it down.
1209		 */
1210		while (cpipe->pipe_busy) {
1211			wakeup(cpipe);
1212			cpipe->pipe_state |= PIPE_WANT | PIPE_EOF;
1213			tsleep(cpipe, PRIBIO, "pipecl", 0);
1214		}
1215
1216		/*
1217		 * Disconnect from peer
1218		 */
1219		if ((ppipe = cpipe->pipe_peer) != NULL) {
1220			pipeselwakeup(ppipe);
1221
1222			ppipe->pipe_state |= PIPE_EOF;
1223			wakeup(ppipe);
1224			ppipe->pipe_peer = NULL;
1225		}
1226		/*
1227		 * free resources
1228		 */
1229		pipe_free_kmem(cpipe);
1230		zfree(pipe_zone, cpipe);
1231	}
1232}
1233
1234/*ARGSUSED*/
1235static int
1236pipe_kqfilter(struct file *fp, struct knote *kn)
1237{
1238	struct pipe *cpipe = (struct pipe *)kn->kn_fp->f_data;
1239
1240	switch (kn->kn_filter) {
1241	case EVFILT_READ:
1242		kn->kn_fop = &pipe_rfiltops;
1243		break;
1244	case EVFILT_WRITE:
1245		kn->kn_fop = &pipe_wfiltops;
1246		cpipe = cpipe->pipe_peer;
1247		break;
1248	default:
1249		return (1);
1250	}
1251	kn->kn_hook = (caddr_t)cpipe;
1252
1253	SLIST_INSERT_HEAD(&cpipe->pipe_sel.si_note, kn, kn_selnext);
1254	return (0);
1255}
1256
1257static void
1258filt_pipedetach(struct knote *kn)
1259{
1260	struct pipe *cpipe = (struct pipe *)kn->kn_hook;
1261
1262	SLIST_REMOVE(&cpipe->pipe_sel.si_note, kn, knote, kn_selnext);
1263}
1264
1265/*ARGSUSED*/
1266static int
1267filt_piperead(struct knote *kn, long hint)
1268{
1269	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1270	struct pipe *wpipe = rpipe->pipe_peer;
1271
1272	kn->kn_data = rpipe->pipe_buffer.cnt;
1273	if ((kn->kn_data == 0) && (rpipe->pipe_state & PIPE_DIRECTW))
1274		kn->kn_data = rpipe->pipe_map.cnt;
1275
1276	if ((rpipe->pipe_state & PIPE_EOF) ||
1277	    (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1278		kn->kn_flags |= EV_EOF;
1279		return (1);
1280	}
1281	return (kn->kn_data > 0);
1282}
1283
1284/*ARGSUSED*/
1285static int
1286filt_pipewrite(struct knote *kn, long hint)
1287{
1288	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1289	struct pipe *wpipe = rpipe->pipe_peer;
1290
1291	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1292		kn->kn_data = 0;
1293		kn->kn_flags |= EV_EOF;
1294		return (1);
1295	}
1296	kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
1297	if (wpipe->pipe_state & PIPE_DIRECTW)
1298		kn->kn_data = 0;
1299
1300	return (kn->kn_data >= PIPE_BUF);
1301}
1302