sys_pipe.c revision 76827
1/*
2 * Copyright (c) 1996 John S. Dyson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice immediately at the beginning of the file, without modification,
10 *    this list of conditions, and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 * 3. Absolutely no warranty of function or purpose is made by the author
15 *    John S. Dyson.
16 * 4. Modifications may be freely made to this file if the above conditions
17 *    are met.
18 *
19 * $FreeBSD: head/sys/kern/sys_pipe.c 76827 2001-05-19 01:28:09Z alfred $
20 */
21
22/*
23 * This file contains a high-performance replacement for the socket-based
24 * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
25 * all features of sockets, but does do everything that pipes normally
26 * do.
27 */
28
29/*
30 * This code has two modes of operation, a small write mode and a large
31 * write mode.  The small write mode acts like conventional pipes with
32 * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
33 * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
34 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
35 * the receiving process can copy it directly from the pages in the sending
36 * process.
37 *
38 * If the sending process receives a signal, it is possible that it will
39 * go away, and certainly its address space can change, because control
40 * is returned back to the user-mode side.  In that case, the pipe code
41 * arranges to copy the buffer supplied by the user process, to a pageable
42 * kernel buffer, and the receiving process will grab the data from the
43 * pageable kernel buffer.  Since signals don't happen all that often,
44 * the copy operation is normally eliminated.
45 *
46 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
47 * happen for small transfers so that the system will not spend all of
48 * its time context switching.  PIPE_SIZE is constrained by the
49 * amount of kernel virtual memory.
50 */
51
52#include <sys/param.h>
53#include <sys/systm.h>
54#include <sys/fcntl.h>
55#include <sys/file.h>
56#include <sys/filedesc.h>
57#include <sys/filio.h>
58#include <sys/lock.h>
59#include <sys/mutex.h>
60#include <sys/ttycom.h>
61#include <sys/stat.h>
62#include <sys/poll.h>
63#include <sys/selinfo.h>
64#include <sys/signalvar.h>
65#include <sys/sysproto.h>
66#include <sys/pipe.h>
67#include <sys/proc.h>
68#include <sys/vnode.h>
69#include <sys/uio.h>
70#include <sys/event.h>
71
72#include <vm/vm.h>
73#include <vm/vm_param.h>
74#include <vm/vm_object.h>
75#include <vm/vm_kern.h>
76#include <vm/vm_extern.h>
77#include <vm/pmap.h>
78#include <vm/vm_map.h>
79#include <vm/vm_page.h>
80#include <vm/vm_zone.h>
81
82/*
83 * Use this define if you want to disable *fancy* VM things.  Expect an
84 * approx 30% decrease in transfer rate.  This could be useful for
85 * NetBSD or OpenBSD.
86 */
87/* #define PIPE_NODIRECT */
88
89/*
90 * interfaces to the outside world
91 */
92static int pipe_read __P((struct file *fp, struct uio *uio,
93		struct ucred *cred, int flags, struct proc *p));
94static int pipe_write __P((struct file *fp, struct uio *uio,
95		struct ucred *cred, int flags, struct proc *p));
96static int pipe_close __P((struct file *fp, struct proc *p));
97static int pipe_poll __P((struct file *fp, int events, struct ucred *cred,
98		struct proc *p));
99static int pipe_kqfilter __P((struct file *fp, struct knote *kn));
100static int pipe_stat __P((struct file *fp, struct stat *sb, struct proc *p));
101static int pipe_ioctl __P((struct file *fp, u_long cmd, caddr_t data, struct proc *p));
102
103static struct fileops pipeops = {
104	pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter,
105	pipe_stat, pipe_close
106};
107
108static void	filt_pipedetach(struct knote *kn);
109static int	filt_piperead(struct knote *kn, long hint);
110static int	filt_pipewrite(struct knote *kn, long hint);
111
112static struct filterops pipe_rfiltops =
113	{ 1, NULL, filt_pipedetach, filt_piperead };
114static struct filterops pipe_wfiltops =
115	{ 1, NULL, filt_pipedetach, filt_pipewrite };
116
117
118/*
119 * Default pipe buffer size(s), this can be kind-of large now because pipe
120 * space is pageable.  The pipe code will try to maintain locality of
121 * reference for performance reasons, so small amounts of outstanding I/O
122 * will not wipe the cache.
123 */
124#define MINPIPESIZE (PIPE_SIZE/3)
125#define MAXPIPESIZE (2*PIPE_SIZE/3)
126
127/*
128 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
129 * is there so that on large systems, we don't exhaust it.
130 */
131#define MAXPIPEKVA (8*1024*1024)
132
133/*
134 * Limit for direct transfers, we cannot, of course limit
135 * the amount of kva for pipes in general though.
136 */
137#define LIMITPIPEKVA (16*1024*1024)
138
139/*
140 * Limit the number of "big" pipes
141 */
142#define LIMITBIGPIPES	32
143static int nbigpipe;
144
145static int amountpipekva;
146
147static void pipeclose __P((struct pipe *cpipe));
148static void pipe_free_kmem __P((struct pipe *cpipe));
149static int pipe_create __P((struct pipe **cpipep));
150static __inline int pipelock __P((struct pipe *cpipe, int catch));
151static __inline void pipeunlock __P((struct pipe *cpipe));
152static __inline void pipeselwakeup __P((struct pipe *cpipe));
153#ifndef PIPE_NODIRECT
154static int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio));
155static void pipe_destroy_write_buffer __P((struct pipe *wpipe));
156static int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio));
157static void pipe_clone_write_buffer __P((struct pipe *wpipe));
158#endif
159static int pipespace __P((struct pipe *cpipe, int size));
160
161static vm_zone_t pipe_zone;
162
163/*
164 * The pipe system call for the DTYPE_PIPE type of pipes
165 */
166
167/* ARGSUSED */
168int
169pipe(p, uap)
170	struct proc *p;
171	struct pipe_args /* {
172		int	dummy;
173	} */ *uap;
174{
175	struct filedesc *fdp = p->p_fd;
176	struct file *rf, *wf;
177	struct pipe *rpipe, *wpipe;
178	int fd, error;
179
180	if (pipe_zone == NULL)
181		pipe_zone = zinit("PIPE", sizeof(struct pipe), 0, 0, 4);
182
183	rpipe = wpipe = NULL;
184	if (pipe_create(&rpipe) || pipe_create(&wpipe)) {
185		pipeclose(rpipe);
186		pipeclose(wpipe);
187		return (ENFILE);
188	}
189
190	rpipe->pipe_state |= PIPE_DIRECTOK;
191	wpipe->pipe_state |= PIPE_DIRECTOK;
192
193	error = falloc(p, &rf, &fd);
194	if (error) {
195		pipeclose(rpipe);
196		pipeclose(wpipe);
197		return (error);
198	}
199	fhold(rf);
200	p->p_retval[0] = fd;
201
202	/*
203	 * Warning: once we've gotten past allocation of the fd for the
204	 * read-side, we can only drop the read side via fdrop() in order
205	 * to avoid races against processes which manage to dup() the read
206	 * side while we are blocked trying to allocate the write side.
207	 */
208	rf->f_flag = FREAD | FWRITE;
209	rf->f_type = DTYPE_PIPE;
210	rf->f_data = (caddr_t)rpipe;
211	rf->f_ops = &pipeops;
212	error = falloc(p, &wf, &fd);
213	if (error) {
214		if (fdp->fd_ofiles[p->p_retval[0]] == rf) {
215			fdp->fd_ofiles[p->p_retval[0]] = NULL;
216			fdrop(rf, p);
217		}
218		fdrop(rf, p);
219		/* rpipe has been closed by fdrop(). */
220		pipeclose(wpipe);
221		return (error);
222	}
223	wf->f_flag = FREAD | FWRITE;
224	wf->f_type = DTYPE_PIPE;
225	wf->f_data = (caddr_t)wpipe;
226	wf->f_ops = &pipeops;
227	p->p_retval[1] = fd;
228
229	rpipe->pipe_peer = wpipe;
230	wpipe->pipe_peer = rpipe;
231	fdrop(rf, p);
232
233	return (0);
234}
235
236/*
237 * Allocate kva for pipe circular buffer, the space is pageable
238 * This routine will 'realloc' the size of a pipe safely, if it fails
239 * it will retain the old buffer.
240 * If it fails it will return ENOMEM.
241 */
242static int
243pipespace(cpipe, size)
244	struct pipe *cpipe;
245	int size;
246{
247	struct vm_object *object;
248	caddr_t buffer;
249	int npages, error;
250
251	npages = round_page(size)/PAGE_SIZE;
252	/*
253	 * Create an object, I don't like the idea of paging to/from
254	 * kernel_object.
255	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
256	 */
257	mtx_lock(&vm_mtx);
258	object = vm_object_allocate(OBJT_DEFAULT, npages);
259	buffer = (caddr_t) vm_map_min(kernel_map);
260
261	/*
262	 * Insert the object into the kernel map, and allocate kva for it.
263	 * The map entry is, by default, pageable.
264	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
265	 */
266	error = vm_map_find(kernel_map, object, 0,
267		(vm_offset_t *) &buffer, size, 1,
268		VM_PROT_ALL, VM_PROT_ALL, 0);
269	mtx_unlock(&vm_mtx);
270
271	if (error != KERN_SUCCESS) {
272		vm_object_deallocate(object);
273		return (ENOMEM);
274	}
275
276	/* free old resources if we're resizing */
277	pipe_free_kmem(cpipe);
278	cpipe->pipe_buffer.object = object;
279	cpipe->pipe_buffer.buffer = buffer;
280	cpipe->pipe_buffer.size = size;
281	cpipe->pipe_buffer.in = 0;
282	cpipe->pipe_buffer.out = 0;
283	cpipe->pipe_buffer.cnt = 0;
284	amountpipekva += cpipe->pipe_buffer.size;
285	return (0);
286}
287
288/*
289 * initialize and allocate VM and memory for pipe
290 */
291static int
292pipe_create(cpipep)
293	struct pipe **cpipep;
294{
295	struct pipe *cpipe;
296	int error;
297
298	*cpipep = zalloc(pipe_zone);
299	if (*cpipep == NULL)
300		return (ENOMEM);
301
302	cpipe = *cpipep;
303
304	/* so pipespace()->pipe_free_kmem() doesn't follow junk pointer */
305	cpipe->pipe_buffer.object = NULL;
306#ifndef PIPE_NODIRECT
307	cpipe->pipe_map.kva = NULL;
308#endif
309	/*
310	 * protect so pipeclose() doesn't follow a junk pointer
311	 * if pipespace() fails.
312	 */
313	bzero(&cpipe->pipe_sel, sizeof(cpipe->pipe_sel));
314	cpipe->pipe_state = 0;
315	cpipe->pipe_peer = NULL;
316	cpipe->pipe_busy = 0;
317
318#ifndef PIPE_NODIRECT
319	/*
320	 * pipe data structure initializations to support direct pipe I/O
321	 */
322	cpipe->pipe_map.cnt = 0;
323	cpipe->pipe_map.kva = 0;
324	cpipe->pipe_map.pos = 0;
325	cpipe->pipe_map.npages = 0;
326	/* cpipe->pipe_map.ms[] = invalid */
327#endif
328
329	error = pipespace(cpipe, PIPE_SIZE);
330	if (error)
331		return (error);
332
333	vfs_timestamp(&cpipe->pipe_ctime);
334	cpipe->pipe_atime = cpipe->pipe_ctime;
335	cpipe->pipe_mtime = cpipe->pipe_ctime;
336
337	return (0);
338}
339
340
341/*
342 * lock a pipe for I/O, blocking other access
343 */
344static __inline int
345pipelock(cpipe, catch)
346	struct pipe *cpipe;
347	int catch;
348{
349	int error;
350
351	while (cpipe->pipe_state & PIPE_LOCK) {
352		cpipe->pipe_state |= PIPE_LWANT;
353		error = tsleep(cpipe, catch ? (PRIBIO | PCATCH) : PRIBIO,
354		    "pipelk", 0);
355		if (error != 0)
356			return (error);
357	}
358	cpipe->pipe_state |= PIPE_LOCK;
359	return (0);
360}
361
362/*
363 * unlock a pipe I/O lock
364 */
365static __inline void
366pipeunlock(cpipe)
367	struct pipe *cpipe;
368{
369
370	cpipe->pipe_state &= ~PIPE_LOCK;
371	if (cpipe->pipe_state & PIPE_LWANT) {
372		cpipe->pipe_state &= ~PIPE_LWANT;
373		wakeup(cpipe);
374	}
375}
376
377static __inline void
378pipeselwakeup(cpipe)
379	struct pipe *cpipe;
380{
381
382	if (cpipe->pipe_state & PIPE_SEL) {
383		cpipe->pipe_state &= ~PIPE_SEL;
384		selwakeup(&cpipe->pipe_sel);
385	}
386	if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio)
387		pgsigio(cpipe->pipe_sigio, SIGIO, 0);
388	KNOTE(&cpipe->pipe_sel.si_note, 0);
389}
390
391/* ARGSUSED */
392static int
393pipe_read(fp, uio, cred, flags, p)
394	struct file *fp;
395	struct uio *uio;
396	struct ucred *cred;
397	struct proc *p;
398	int flags;
399{
400	struct pipe *rpipe = (struct pipe *) fp->f_data;
401	int error;
402	int nread = 0;
403	u_int size;
404
405	++rpipe->pipe_busy;
406	error = pipelock(rpipe, 1);
407	if (error)
408		goto unlocked_error;
409
410	while (uio->uio_resid) {
411		/*
412		 * normal pipe buffer receive
413		 */
414		if (rpipe->pipe_buffer.cnt > 0) {
415			size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
416			if (size > rpipe->pipe_buffer.cnt)
417				size = rpipe->pipe_buffer.cnt;
418			if (size > (u_int) uio->uio_resid)
419				size = (u_int) uio->uio_resid;
420
421			error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
422					size, uio);
423			if (error)
424				break;
425
426			rpipe->pipe_buffer.out += size;
427			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
428				rpipe->pipe_buffer.out = 0;
429
430			rpipe->pipe_buffer.cnt -= size;
431
432			/*
433			 * If there is no more to read in the pipe, reset
434			 * its pointers to the beginning.  This improves
435			 * cache hit stats.
436			 */
437			if (rpipe->pipe_buffer.cnt == 0) {
438				rpipe->pipe_buffer.in = 0;
439				rpipe->pipe_buffer.out = 0;
440			}
441			nread += size;
442#ifndef PIPE_NODIRECT
443		/*
444		 * Direct copy, bypassing a kernel buffer.
445		 */
446		} else if ((size = rpipe->pipe_map.cnt) &&
447			   (rpipe->pipe_state & PIPE_DIRECTW)) {
448			caddr_t	va;
449			if (size > (u_int) uio->uio_resid)
450				size = (u_int) uio->uio_resid;
451
452			va = (caddr_t) rpipe->pipe_map.kva +
453			    rpipe->pipe_map.pos;
454			error = uiomove(va, size, uio);
455			if (error)
456				break;
457			nread += size;
458			rpipe->pipe_map.pos += size;
459			rpipe->pipe_map.cnt -= size;
460			if (rpipe->pipe_map.cnt == 0) {
461				rpipe->pipe_state &= ~PIPE_DIRECTW;
462				wakeup(rpipe);
463			}
464#endif
465		} else {
466			/*
467			 * detect EOF condition
468			 * read returns 0 on EOF, no need to set error
469			 */
470			if (rpipe->pipe_state & PIPE_EOF)
471				break;
472
473			/*
474			 * If the "write-side" has been blocked, wake it up now.
475			 */
476			if (rpipe->pipe_state & PIPE_WANTW) {
477				rpipe->pipe_state &= ~PIPE_WANTW;
478				wakeup(rpipe);
479			}
480
481			/*
482			 * Break if some data was read.
483			 */
484			if (nread > 0)
485				break;
486
487			/*
488			 * Unlock the pipe buffer for our remaining processing.  We
489			 * will either break out with an error or we will sleep and
490			 * relock to loop.
491			 */
492			pipeunlock(rpipe);
493
494			/*
495			 * Handle non-blocking mode operation or
496			 * wait for more data.
497			 */
498			if (fp->f_flag & FNONBLOCK) {
499				error = EAGAIN;
500			} else {
501				rpipe->pipe_state |= PIPE_WANTR;
502				if ((error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) == 0)
503					error = pipelock(rpipe, 1);
504			}
505			if (error)
506				goto unlocked_error;
507		}
508	}
509	pipeunlock(rpipe);
510
511	if (error == 0)
512		vfs_timestamp(&rpipe->pipe_atime);
513unlocked_error:
514	--rpipe->pipe_busy;
515
516	/*
517	 * PIPE_WANT processing only makes sense if pipe_busy is 0.
518	 */
519	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
520		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
521		wakeup(rpipe);
522	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
523		/*
524		 * Handle write blocking hysteresis.
525		 */
526		if (rpipe->pipe_state & PIPE_WANTW) {
527			rpipe->pipe_state &= ~PIPE_WANTW;
528			wakeup(rpipe);
529		}
530	}
531
532	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
533		pipeselwakeup(rpipe);
534
535	return (error);
536}
537
538#ifndef PIPE_NODIRECT
539/*
540 * Map the sending processes' buffer into kernel space and wire it.
541 * This is similar to a physical write operation.
542 */
543static int
544pipe_build_write_buffer(wpipe, uio)
545	struct pipe *wpipe;
546	struct uio *uio;
547{
548	u_int size;
549	int i;
550	vm_offset_t addr, endaddr, paddr;
551
552	size = (u_int) uio->uio_iov->iov_len;
553	if (size > wpipe->pipe_buffer.size)
554		size = wpipe->pipe_buffer.size;
555
556	endaddr = round_page((vm_offset_t)uio->uio_iov->iov_base + size);
557	mtx_lock(&vm_mtx);
558	addr = trunc_page((vm_offset_t)uio->uio_iov->iov_base);
559	for (i = 0; addr < endaddr; addr += PAGE_SIZE, i++) {
560		vm_page_t m;
561
562		if (vm_fault_quick((caddr_t)addr, VM_PROT_READ) < 0 ||
563		    (paddr = pmap_kextract(addr)) == 0) {
564			int j;
565
566			for (j = 0; j < i; j++)
567				vm_page_unwire(wpipe->pipe_map.ms[j], 1);
568			mtx_unlock(&vm_mtx);
569			return (EFAULT);
570		}
571
572		m = PHYS_TO_VM_PAGE(paddr);
573		vm_page_wire(m);
574		wpipe->pipe_map.ms[i] = m;
575	}
576
577/*
578 * set up the control block
579 */
580	wpipe->pipe_map.npages = i;
581	wpipe->pipe_map.pos =
582	    ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
583	wpipe->pipe_map.cnt = size;
584
585/*
586 * and map the buffer
587 */
588	if (wpipe->pipe_map.kva == 0) {
589		/*
590		 * We need to allocate space for an extra page because the
591		 * address range might (will) span pages at times.
592		 */
593		wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
594			wpipe->pipe_buffer.size + PAGE_SIZE);
595		amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE;
596	}
597	pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
598		wpipe->pipe_map.npages);
599
600	mtx_unlock(&vm_mtx);
601/*
602 * and update the uio data
603 */
604
605	uio->uio_iov->iov_len -= size;
606	uio->uio_iov->iov_base += size;
607	if (uio->uio_iov->iov_len == 0)
608		uio->uio_iov++;
609	uio->uio_resid -= size;
610	uio->uio_offset += size;
611	return (0);
612}
613
614/*
615 * unmap and unwire the process buffer
616 */
617static void
618pipe_destroy_write_buffer(wpipe)
619	struct pipe *wpipe;
620{
621	int i;
622
623	if (wpipe->pipe_map.kva) {
624		pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
625
626		if (amountpipekva > MAXPIPEKVA) {
627			vm_offset_t kva = wpipe->pipe_map.kva;
628			wpipe->pipe_map.kva = 0;
629			kmem_free(kernel_map, kva,
630				wpipe->pipe_buffer.size + PAGE_SIZE);
631			amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
632		}
633	}
634	mtx_lock(&vm_mtx);
635	for (i = 0; i < wpipe->pipe_map.npages; i++)
636		vm_page_unwire(wpipe->pipe_map.ms[i], 1);
637	mtx_unlock(&vm_mtx);
638}
639
640/*
641 * In the case of a signal, the writing process might go away.  This
642 * code copies the data into the circular buffer so that the source
643 * pages can be freed without loss of data.
644 */
645static void
646pipe_clone_write_buffer(wpipe)
647	struct pipe *wpipe;
648{
649	int size;
650	int pos;
651
652	size = wpipe->pipe_map.cnt;
653	pos = wpipe->pipe_map.pos;
654	bcopy((caddr_t) wpipe->pipe_map.kva + pos,
655	    (caddr_t) wpipe->pipe_buffer.buffer, size);
656
657	wpipe->pipe_buffer.in = size;
658	wpipe->pipe_buffer.out = 0;
659	wpipe->pipe_buffer.cnt = size;
660	wpipe->pipe_state &= ~PIPE_DIRECTW;
661
662	pipe_destroy_write_buffer(wpipe);
663}
664
665/*
666 * This implements the pipe buffer write mechanism.  Note that only
667 * a direct write OR a normal pipe write can be pending at any given time.
668 * If there are any characters in the pipe buffer, the direct write will
669 * be deferred until the receiving process grabs all of the bytes from
670 * the pipe buffer.  Then the direct mapping write is set-up.
671 */
672static int
673pipe_direct_write(wpipe, uio)
674	struct pipe *wpipe;
675	struct uio *uio;
676{
677	int error;
678
679retry:
680	while (wpipe->pipe_state & PIPE_DIRECTW) {
681		if (wpipe->pipe_state & PIPE_WANTR) {
682			wpipe->pipe_state &= ~PIPE_WANTR;
683			wakeup(wpipe);
684		}
685		wpipe->pipe_state |= PIPE_WANTW;
686		error = tsleep(wpipe, PRIBIO | PCATCH, "pipdww", 0);
687		if (error)
688			goto error1;
689		if (wpipe->pipe_state & PIPE_EOF) {
690			error = EPIPE;
691			goto error1;
692		}
693	}
694	wpipe->pipe_map.cnt = 0;	/* transfer not ready yet */
695	if (wpipe->pipe_buffer.cnt > 0) {
696		if (wpipe->pipe_state & PIPE_WANTR) {
697			wpipe->pipe_state &= ~PIPE_WANTR;
698			wakeup(wpipe);
699		}
700
701		wpipe->pipe_state |= PIPE_WANTW;
702		error = tsleep(wpipe, PRIBIO | PCATCH, "pipdwc", 0);
703		if (error)
704			goto error1;
705		if (wpipe->pipe_state & PIPE_EOF) {
706			error = EPIPE;
707			goto error1;
708		}
709		goto retry;
710	}
711
712	wpipe->pipe_state |= PIPE_DIRECTW;
713
714	error = pipe_build_write_buffer(wpipe, uio);
715	if (error) {
716		wpipe->pipe_state &= ~PIPE_DIRECTW;
717		goto error1;
718	}
719
720	error = 0;
721	while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
722		if (wpipe->pipe_state & PIPE_EOF) {
723			pipelock(wpipe, 0);
724			pipe_destroy_write_buffer(wpipe);
725			pipeunlock(wpipe);
726			pipeselwakeup(wpipe);
727			error = EPIPE;
728			goto error1;
729		}
730		if (wpipe->pipe_state & PIPE_WANTR) {
731			wpipe->pipe_state &= ~PIPE_WANTR;
732			wakeup(wpipe);
733		}
734		pipeselwakeup(wpipe);
735		error = tsleep(wpipe, PRIBIO | PCATCH, "pipdwt", 0);
736	}
737
738	pipelock(wpipe,0);
739	if (wpipe->pipe_state & PIPE_DIRECTW) {
740		/*
741		 * this bit of trickery substitutes a kernel buffer for
742		 * the process that might be going away.
743		 */
744		pipe_clone_write_buffer(wpipe);
745	} else {
746		pipe_destroy_write_buffer(wpipe);
747	}
748	pipeunlock(wpipe);
749	return (error);
750
751error1:
752	wakeup(wpipe);
753	return (error);
754}
755#endif
756
757static int
758pipe_write(fp, uio, cred, flags, p)
759	struct file *fp;
760	struct uio *uio;
761	struct ucred *cred;
762	struct proc *p;
763	int flags;
764{
765	int error = 0;
766	int orig_resid;
767	struct pipe *wpipe, *rpipe;
768
769	rpipe = (struct pipe *) fp->f_data;
770	wpipe = rpipe->pipe_peer;
771
772	/*
773	 * detect loss of pipe read side, issue SIGPIPE if lost.
774	 */
775	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
776		return (EPIPE);
777	}
778
779	/*
780	 * If it is advantageous to resize the pipe buffer, do
781	 * so.
782	 */
783	if ((uio->uio_resid > PIPE_SIZE) &&
784		(nbigpipe < LIMITBIGPIPES) &&
785		(wpipe->pipe_state & PIPE_DIRECTW) == 0 &&
786		(wpipe->pipe_buffer.size <= PIPE_SIZE) &&
787		(wpipe->pipe_buffer.cnt == 0)) {
788
789		if ((error = pipelock(wpipe,1)) == 0) {
790			if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
791				nbigpipe++;
792			pipeunlock(wpipe);
793		} else {
794			return (error);
795		}
796	}
797
798	KASSERT(wpipe->pipe_buffer.buffer != NULL, ("pipe buffer gone"));
799
800	++wpipe->pipe_busy;
801	orig_resid = uio->uio_resid;
802	while (uio->uio_resid) {
803		int space;
804
805#ifndef PIPE_NODIRECT
806		/*
807		 * If the transfer is large, we can gain performance if
808		 * we do process-to-process copies directly.
809		 * If the write is non-blocking, we don't use the
810		 * direct write mechanism.
811		 *
812		 * The direct write mechanism will detect the reader going
813		 * away on us.
814		 */
815		if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) &&
816		    (fp->f_flag & FNONBLOCK) == 0 &&
817			(wpipe->pipe_map.kva || (amountpipekva < LIMITPIPEKVA)) &&
818			(uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
819			error = pipe_direct_write( wpipe, uio);
820			if (error)
821				break;
822			continue;
823		}
824#endif
825
826		/*
827		 * Pipe buffered writes cannot be coincidental with
828		 * direct writes.  We wait until the currently executing
829		 * direct write is completed before we start filling the
830		 * pipe buffer.  We break out if a signal occurs or the
831		 * reader goes away.
832		 */
833	retrywrite:
834		while (wpipe->pipe_state & PIPE_DIRECTW) {
835			if (wpipe->pipe_state & PIPE_WANTR) {
836				wpipe->pipe_state &= ~PIPE_WANTR;
837				wakeup(wpipe);
838			}
839			error = tsleep(wpipe, PRIBIO | PCATCH, "pipbww", 0);
840			if (wpipe->pipe_state & PIPE_EOF)
841				break;
842			if (error)
843				break;
844		}
845		if (wpipe->pipe_state & PIPE_EOF) {
846			error = EPIPE;
847			break;
848		}
849
850		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
851
852		/* Writes of size <= PIPE_BUF must be atomic. */
853		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
854			space = 0;
855
856		if (space > 0 && (wpipe->pipe_buffer.cnt < PIPE_SIZE)) {
857			if ((error = pipelock(wpipe,1)) == 0) {
858				int size;	/* Transfer size */
859				int segsize;	/* first segment to transfer */
860
861				/*
862				 * It is possible for a direct write to
863				 * slip in on us... handle it here...
864				 */
865				if (wpipe->pipe_state & PIPE_DIRECTW) {
866					pipeunlock(wpipe);
867					goto retrywrite;
868				}
869				/*
870				 * If a process blocked in uiomove, our
871				 * value for space might be bad.
872				 *
873				 * XXX will we be ok if the reader has gone
874				 * away here?
875				 */
876				if (space > wpipe->pipe_buffer.size -
877				    wpipe->pipe_buffer.cnt) {
878					pipeunlock(wpipe);
879					goto retrywrite;
880				}
881
882				/*
883				 * Transfer size is minimum of uio transfer
884				 * and free space in pipe buffer.
885				 */
886				if (space > uio->uio_resid)
887					size = uio->uio_resid;
888				else
889					size = space;
890				/*
891				 * First segment to transfer is minimum of
892				 * transfer size and contiguous space in
893				 * pipe buffer.  If first segment to transfer
894				 * is less than the transfer size, we've got
895				 * a wraparound in the buffer.
896				 */
897				segsize = wpipe->pipe_buffer.size -
898					wpipe->pipe_buffer.in;
899				if (segsize > size)
900					segsize = size;
901
902				/* Transfer first segment */
903
904				error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
905						segsize, uio);
906
907				if (error == 0 && segsize < size) {
908					/*
909					 * Transfer remaining part now, to
910					 * support atomic writes.  Wraparound
911					 * happened.
912					 */
913					if (wpipe->pipe_buffer.in + segsize !=
914					    wpipe->pipe_buffer.size)
915						panic("Expected pipe buffer wraparound disappeared");
916
917					error = uiomove(&wpipe->pipe_buffer.buffer[0],
918							size - segsize, uio);
919				}
920				if (error == 0) {
921					wpipe->pipe_buffer.in += size;
922					if (wpipe->pipe_buffer.in >=
923					    wpipe->pipe_buffer.size) {
924						if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size)
925							panic("Expected wraparound bad");
926						wpipe->pipe_buffer.in = size - segsize;
927					}
928
929					wpipe->pipe_buffer.cnt += size;
930					if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size)
931						panic("Pipe buffer overflow");
932
933				}
934				pipeunlock(wpipe);
935			}
936			if (error)
937				break;
938
939		} else {
940			/*
941			 * If the "read-side" has been blocked, wake it up now.
942			 */
943			if (wpipe->pipe_state & PIPE_WANTR) {
944				wpipe->pipe_state &= ~PIPE_WANTR;
945				wakeup(wpipe);
946			}
947
948			/*
949			 * don't block on non-blocking I/O
950			 */
951			if (fp->f_flag & FNONBLOCK) {
952				error = EAGAIN;
953				break;
954			}
955
956			/*
957			 * We have no more space and have something to offer,
958			 * wake up select/poll.
959			 */
960			pipeselwakeup(wpipe);
961
962			wpipe->pipe_state |= PIPE_WANTW;
963			error = tsleep(wpipe, PRIBIO | PCATCH, "pipewr", 0);
964			if (error != 0)
965				break;
966			/*
967			 * If read side wants to go away, we just issue a signal
968			 * to ourselves.
969			 */
970			if (wpipe->pipe_state & PIPE_EOF) {
971				error = EPIPE;
972				break;
973			}
974		}
975	}
976
977	--wpipe->pipe_busy;
978	if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) {
979		wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
980		wakeup(wpipe);
981	} else if (wpipe->pipe_buffer.cnt > 0) {
982		/*
983		 * If we have put any characters in the buffer, we wake up
984		 * the reader.
985		 */
986		if (wpipe->pipe_state & PIPE_WANTR) {
987			wpipe->pipe_state &= ~PIPE_WANTR;
988			wakeup(wpipe);
989		}
990	}
991
992	/*
993	 * Don't return EPIPE if I/O was successful
994	 */
995	if ((wpipe->pipe_buffer.cnt == 0) &&
996		(uio->uio_resid == 0) &&
997		(error == EPIPE))
998		error = 0;
999
1000	if (error == 0)
1001		vfs_timestamp(&wpipe->pipe_mtime);
1002
1003	/*
1004	 * We have something to offer,
1005	 * wake up select/poll.
1006	 */
1007	if (wpipe->pipe_buffer.cnt)
1008		pipeselwakeup(wpipe);
1009
1010	return (error);
1011}
1012
1013/*
1014 * we implement a very minimal set of ioctls for compatibility with sockets.
1015 */
1016int
1017pipe_ioctl(fp, cmd, data, p)
1018	struct file *fp;
1019	u_long cmd;
1020	caddr_t data;
1021	struct proc *p;
1022{
1023	struct pipe *mpipe = (struct pipe *)fp->f_data;
1024
1025	switch (cmd) {
1026
1027	case FIONBIO:
1028		return (0);
1029
1030	case FIOASYNC:
1031		if (*(int *)data) {
1032			mpipe->pipe_state |= PIPE_ASYNC;
1033		} else {
1034			mpipe->pipe_state &= ~PIPE_ASYNC;
1035		}
1036		return (0);
1037
1038	case FIONREAD:
1039		if (mpipe->pipe_state & PIPE_DIRECTW)
1040			*(int *)data = mpipe->pipe_map.cnt;
1041		else
1042			*(int *)data = mpipe->pipe_buffer.cnt;
1043		return (0);
1044
1045	case FIOSETOWN:
1046		return (fsetown(*(int *)data, &mpipe->pipe_sigio));
1047
1048	case FIOGETOWN:
1049		*(int *)data = fgetown(mpipe->pipe_sigio);
1050		return (0);
1051
1052	/* This is deprecated, FIOSETOWN should be used instead. */
1053	case TIOCSPGRP:
1054		return (fsetown(-(*(int *)data), &mpipe->pipe_sigio));
1055
1056	/* This is deprecated, FIOGETOWN should be used instead. */
1057	case TIOCGPGRP:
1058		*(int *)data = -fgetown(mpipe->pipe_sigio);
1059		return (0);
1060
1061	}
1062	return (ENOTTY);
1063}
1064
1065int
1066pipe_poll(fp, events, cred, p)
1067	struct file *fp;
1068	int events;
1069	struct ucred *cred;
1070	struct proc *p;
1071{
1072	struct pipe *rpipe = (struct pipe *)fp->f_data;
1073	struct pipe *wpipe;
1074	int revents = 0;
1075
1076	wpipe = rpipe->pipe_peer;
1077	if (events & (POLLIN | POLLRDNORM))
1078		if ((rpipe->pipe_state & PIPE_DIRECTW) ||
1079		    (rpipe->pipe_buffer.cnt > 0) ||
1080		    (rpipe->pipe_state & PIPE_EOF))
1081			revents |= events & (POLLIN | POLLRDNORM);
1082
1083	if (events & (POLLOUT | POLLWRNORM))
1084		if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF) ||
1085		    (((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
1086		     (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF))
1087			revents |= events & (POLLOUT | POLLWRNORM);
1088
1089	if ((rpipe->pipe_state & PIPE_EOF) ||
1090	    (wpipe == NULL) ||
1091	    (wpipe->pipe_state & PIPE_EOF))
1092		revents |= POLLHUP;
1093
1094	if (revents == 0) {
1095		if (events & (POLLIN | POLLRDNORM)) {
1096			selrecord(p, &rpipe->pipe_sel);
1097			rpipe->pipe_state |= PIPE_SEL;
1098		}
1099
1100		if (events & (POLLOUT | POLLWRNORM)) {
1101			selrecord(p, &wpipe->pipe_sel);
1102			wpipe->pipe_state |= PIPE_SEL;
1103		}
1104	}
1105
1106	return (revents);
1107}
1108
1109static int
1110pipe_stat(fp, ub, p)
1111	struct file *fp;
1112	struct stat *ub;
1113	struct proc *p;
1114{
1115	struct pipe *pipe = (struct pipe *)fp->f_data;
1116
1117	bzero((caddr_t)ub, sizeof(*ub));
1118	ub->st_mode = S_IFIFO;
1119	ub->st_blksize = pipe->pipe_buffer.size;
1120	ub->st_size = pipe->pipe_buffer.cnt;
1121	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1122	ub->st_atimespec = pipe->pipe_atime;
1123	ub->st_mtimespec = pipe->pipe_mtime;
1124	ub->st_ctimespec = pipe->pipe_ctime;
1125	ub->st_uid = fp->f_cred->cr_uid;
1126	ub->st_gid = fp->f_cred->cr_gid;
1127	/*
1128	 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen.
1129	 * XXX (st_dev, st_ino) should be unique.
1130	 */
1131	return (0);
1132}
1133
1134/* ARGSUSED */
1135static int
1136pipe_close(fp, p)
1137	struct file *fp;
1138	struct proc *p;
1139{
1140	struct pipe *cpipe = (struct pipe *)fp->f_data;
1141
1142	fp->f_ops = &badfileops;
1143	fp->f_data = NULL;
1144	funsetown(cpipe->pipe_sigio);
1145	pipeclose(cpipe);
1146	return (0);
1147}
1148
1149static void
1150pipe_free_kmem(cpipe)
1151	struct pipe *cpipe;
1152{
1153
1154	if (cpipe->pipe_buffer.buffer != NULL) {
1155		if (cpipe->pipe_buffer.size > PIPE_SIZE)
1156			--nbigpipe;
1157		amountpipekva -= cpipe->pipe_buffer.size;
1158		kmem_free(kernel_map,
1159			(vm_offset_t)cpipe->pipe_buffer.buffer,
1160			cpipe->pipe_buffer.size);
1161		cpipe->pipe_buffer.buffer = NULL;
1162	}
1163#ifndef PIPE_NODIRECT
1164	if (cpipe->pipe_map.kva != NULL) {
1165		amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE;
1166		kmem_free(kernel_map,
1167			cpipe->pipe_map.kva,
1168			cpipe->pipe_buffer.size + PAGE_SIZE);
1169		cpipe->pipe_map.cnt = 0;
1170		cpipe->pipe_map.kva = 0;
1171		cpipe->pipe_map.pos = 0;
1172		cpipe->pipe_map.npages = 0;
1173	}
1174#endif
1175}
1176
1177/*
1178 * shutdown the pipe
1179 */
1180static void
1181pipeclose(cpipe)
1182	struct pipe *cpipe;
1183{
1184	struct pipe *ppipe;
1185
1186	if (cpipe) {
1187
1188		pipeselwakeup(cpipe);
1189
1190		/*
1191		 * If the other side is blocked, wake it up saying that
1192		 * we want to close it down.
1193		 */
1194		while (cpipe->pipe_busy) {
1195			wakeup(cpipe);
1196			cpipe->pipe_state |= PIPE_WANT | PIPE_EOF;
1197			tsleep(cpipe, PRIBIO, "pipecl", 0);
1198		}
1199
1200		/*
1201		 * Disconnect from peer
1202		 */
1203		if ((ppipe = cpipe->pipe_peer) != NULL) {
1204			pipeselwakeup(ppipe);
1205
1206			ppipe->pipe_state |= PIPE_EOF;
1207			wakeup(ppipe);
1208			ppipe->pipe_peer = NULL;
1209		}
1210		/*
1211		 * free resources
1212		 */
1213		mtx_lock(&vm_mtx);
1214		pipe_free_kmem(cpipe);
1215		zfree(pipe_zone, cpipe);
1216		mtx_unlock(&vm_mtx);
1217	}
1218}
1219
1220/*ARGSUSED*/
1221static int
1222pipe_kqfilter(struct file *fp, struct knote *kn)
1223{
1224	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1225
1226	switch (kn->kn_filter) {
1227	case EVFILT_READ:
1228		kn->kn_fop = &pipe_rfiltops;
1229		break;
1230	case EVFILT_WRITE:
1231		kn->kn_fop = &pipe_wfiltops;
1232		break;
1233	default:
1234		return (1);
1235	}
1236
1237	SLIST_INSERT_HEAD(&rpipe->pipe_sel.si_note, kn, kn_selnext);
1238	return (0);
1239}
1240
1241static void
1242filt_pipedetach(struct knote *kn)
1243{
1244	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1245
1246	SLIST_REMOVE(&rpipe->pipe_sel.si_note, kn, knote, kn_selnext);
1247}
1248
1249/*ARGSUSED*/
1250static int
1251filt_piperead(struct knote *kn, long hint)
1252{
1253	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1254	struct pipe *wpipe = rpipe->pipe_peer;
1255
1256	kn->kn_data = rpipe->pipe_buffer.cnt;
1257	if ((kn->kn_data == 0) && (rpipe->pipe_state & PIPE_DIRECTW))
1258		kn->kn_data = rpipe->pipe_map.cnt;
1259
1260	if ((rpipe->pipe_state & PIPE_EOF) ||
1261	    (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1262		kn->kn_flags |= EV_EOF;
1263		return (1);
1264	}
1265	return (kn->kn_data > 0);
1266}
1267
1268/*ARGSUSED*/
1269static int
1270filt_pipewrite(struct knote *kn, long hint)
1271{
1272	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1273	struct pipe *wpipe = rpipe->pipe_peer;
1274
1275	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1276		kn->kn_data = 0;
1277		kn->kn_flags |= EV_EOF;
1278		return (1);
1279	}
1280	kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
1281	if (wpipe->pipe_state & PIPE_DIRECTW)
1282		kn->kn_data = 0;
1283
1284	return (kn->kn_data >= PIPE_BUF);
1285}
1286