sys_pipe.c revision 91362
1/*
2 * Copyright (c) 1996 John S. Dyson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice immediately at the beginning of the file, without modification,
10 *    this list of conditions, and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 * 3. Absolutely no warranty of function or purpose is made by the author
15 *    John S. Dyson.
16 * 4. Modifications may be freely made to this file if the above conditions
17 *    are met.
18 *
19 * $FreeBSD: head/sys/kern/sys_pipe.c 91362 2002-02-27 07:35:59Z alfred $
20 */
21
22/*
23 * This file contains a high-performance replacement for the socket-based
24 * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
25 * all features of sockets, but does do everything that pipes normally
26 * do.
27 */
28
29/*
30 * This code has two modes of operation, a small write mode and a large
31 * write mode.  The small write mode acts like conventional pipes with
32 * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
33 * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
34 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
35 * the receiving process can copy it directly from the pages in the sending
36 * process.
37 *
38 * If the sending process receives a signal, it is possible that it will
39 * go away, and certainly its address space can change, because control
40 * is returned back to the user-mode side.  In that case, the pipe code
41 * arranges to copy the buffer supplied by the user process, to a pageable
42 * kernel buffer, and the receiving process will grab the data from the
43 * pageable kernel buffer.  Since signals don't happen all that often,
44 * the copy operation is normally eliminated.
45 *
46 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
47 * happen for small transfers so that the system will not spend all of
48 * its time context switching.  PIPE_SIZE is constrained by the
49 * amount of kernel virtual memory.
50 */
51
52#include <sys/param.h>
53#include <sys/systm.h>
54#include <sys/fcntl.h>
55#include <sys/file.h>
56#include <sys/filedesc.h>
57#include <sys/filio.h>
58#include <sys/lock.h>
59#include <sys/mutex.h>
60#include <sys/ttycom.h>
61#include <sys/stat.h>
62#include <sys/poll.h>
63#include <sys/selinfo.h>
64#include <sys/signalvar.h>
65#include <sys/sysproto.h>
66#include <sys/pipe.h>
67#include <sys/proc.h>
68#include <sys/vnode.h>
69#include <sys/uio.h>
70#include <sys/event.h>
71
72#include <vm/vm.h>
73#include <vm/vm_param.h>
74#include <vm/vm_object.h>
75#include <vm/vm_kern.h>
76#include <vm/vm_extern.h>
77#include <vm/pmap.h>
78#include <vm/vm_map.h>
79#include <vm/vm_page.h>
80#include <vm/vm_zone.h>
81
82/*
83 * Use this define if you want to disable *fancy* VM things.  Expect an
84 * approx 30% decrease in transfer rate.  This could be useful for
85 * NetBSD or OpenBSD.
86 */
87/* #define PIPE_NODIRECT */
88
89/*
90 * interfaces to the outside world
91 */
92static int pipe_read __P((struct file *fp, struct uio *uio,
93		struct ucred *cred, int flags, struct thread *td));
94static int pipe_write __P((struct file *fp, struct uio *uio,
95		struct ucred *cred, int flags, struct thread *td));
96static int pipe_close __P((struct file *fp, struct thread *td));
97static int pipe_poll __P((struct file *fp, int events, struct ucred *cred,
98		struct thread *td));
99static int pipe_kqfilter __P((struct file *fp, struct knote *kn));
100static int pipe_stat __P((struct file *fp, struct stat *sb, struct thread *td));
101static int pipe_ioctl __P((struct file *fp, u_long cmd, caddr_t data, struct thread *td));
102
103static struct fileops pipeops = {
104	pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter,
105	pipe_stat, pipe_close
106};
107
108static void	filt_pipedetach(struct knote *kn);
109static int	filt_piperead(struct knote *kn, long hint);
110static int	filt_pipewrite(struct knote *kn, long hint);
111
112static struct filterops pipe_rfiltops =
113	{ 1, NULL, filt_pipedetach, filt_piperead };
114static struct filterops pipe_wfiltops =
115	{ 1, NULL, filt_pipedetach, filt_pipewrite };
116
117#define PIPE_GET_GIANT(pipe)							\
118	do {								\
119		PIPE_UNLOCK(wpipe);					\
120		mtx_lock(&Giant);					\
121	} while (0)
122
123#define PIPE_DROP_GIANT(pipe)						\
124	do {								\
125		mtx_unlock(&Giant);					\
126		PIPE_LOCK(wpipe);					\
127	} while (0)
128
129/*
130 * Default pipe buffer size(s), this can be kind-of large now because pipe
131 * space is pageable.  The pipe code will try to maintain locality of
132 * reference for performance reasons, so small amounts of outstanding I/O
133 * will not wipe the cache.
134 */
135#define MINPIPESIZE (PIPE_SIZE/3)
136#define MAXPIPESIZE (2*PIPE_SIZE/3)
137
138/*
139 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
140 * is there so that on large systems, we don't exhaust it.
141 */
142#define MAXPIPEKVA (8*1024*1024)
143
144/*
145 * Limit for direct transfers, we cannot, of course limit
146 * the amount of kva for pipes in general though.
147 */
148#define LIMITPIPEKVA (16*1024*1024)
149
150/*
151 * Limit the number of "big" pipes
152 */
153#define LIMITBIGPIPES	32
154static int nbigpipe;
155
156static int amountpipekva;
157
158static void pipeclose __P((struct pipe *cpipe));
159static void pipe_free_kmem __P((struct pipe *cpipe));
160static int pipe_create __P((struct pipe **cpipep));
161static __inline int pipelock __P((struct pipe *cpipe, int catch));
162static __inline void pipeunlock __P((struct pipe *cpipe));
163static __inline void pipeselwakeup __P((struct pipe *cpipe));
164#ifndef PIPE_NODIRECT
165static int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio));
166static void pipe_destroy_write_buffer __P((struct pipe *wpipe));
167static int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio));
168static void pipe_clone_write_buffer __P((struct pipe *wpipe));
169#endif
170static int pipespace __P((struct pipe *cpipe, int size));
171
172static vm_zone_t pipe_zone;
173
174/*
175 * The pipe system call for the DTYPE_PIPE type of pipes
176 */
177
178/* ARGSUSED */
179int
180pipe(td, uap)
181	struct thread *td;
182	struct pipe_args /* {
183		int	dummy;
184	} */ *uap;
185{
186	struct filedesc *fdp = td->td_proc->p_fd;
187	struct file *rf, *wf;
188	struct pipe *rpipe, *wpipe;
189	int fd, error;
190
191	/* XXX: SYSINIT this! */
192	if (pipe_zone == NULL)
193		pipe_zone = zinit("PIPE", sizeof(struct pipe), 0, 0, 4);
194
195	rpipe = wpipe = NULL;
196	if (pipe_create(&rpipe) || pipe_create(&wpipe)) {
197		pipeclose(rpipe);
198		pipeclose(wpipe);
199		return (ENFILE);
200	}
201
202	rpipe->pipe_state |= PIPE_DIRECTOK;
203	wpipe->pipe_state |= PIPE_DIRECTOK;
204
205	error = falloc(td, &rf, &fd);
206	if (error) {
207		pipeclose(rpipe);
208		pipeclose(wpipe);
209		return (error);
210	}
211	fhold(rf);
212	td->td_retval[0] = fd;
213
214	/*
215	 * Warning: once we've gotten past allocation of the fd for the
216	 * read-side, we can only drop the read side via fdrop() in order
217	 * to avoid races against processes which manage to dup() the read
218	 * side while we are blocked trying to allocate the write side.
219	 */
220	FILE_LOCK(rf);
221	rf->f_flag = FREAD | FWRITE;
222	rf->f_type = DTYPE_PIPE;
223	rf->f_data = (caddr_t)rpipe;
224	rf->f_ops = &pipeops;
225	FILE_UNLOCK(rf);
226	error = falloc(td, &wf, &fd);
227	if (error) {
228		FILEDESC_LOCK(fdp);
229		if (fdp->fd_ofiles[td->td_retval[0]] == rf) {
230			fdp->fd_ofiles[td->td_retval[0]] = NULL;
231			FILEDESC_UNLOCK(fdp);
232			fdrop(rf, td);
233		} else
234			FILEDESC_UNLOCK(fdp);
235		fdrop(rf, td);
236		/* rpipe has been closed by fdrop(). */
237		pipeclose(wpipe);
238		return (error);
239	}
240	FILE_LOCK(wf);
241	wf->f_flag = FREAD | FWRITE;
242	wf->f_type = DTYPE_PIPE;
243	wf->f_data = (caddr_t)wpipe;
244	wf->f_ops = &pipeops;
245	FILE_UNLOCK(wf);
246	td->td_retval[1] = fd;
247	rpipe->pipe_peer = wpipe;
248	wpipe->pipe_peer = rpipe;
249	rpipe->pipe_mtxp = wpipe->pipe_mtxp = mtx_pool_alloc();
250	fdrop(rf, td);
251
252	return (0);
253}
254
255/*
256 * Allocate kva for pipe circular buffer, the space is pageable
257 * This routine will 'realloc' the size of a pipe safely, if it fails
258 * it will retain the old buffer.
259 * If it fails it will return ENOMEM.
260 */
261static int
262pipespace(cpipe, size)
263	struct pipe *cpipe;
264	int size;
265{
266	struct vm_object *object;
267	caddr_t buffer;
268	int npages, error;
269
270	GIANT_REQUIRED;
271
272	npages = round_page(size)/PAGE_SIZE;
273	/*
274	 * Create an object, I don't like the idea of paging to/from
275	 * kernel_object.
276	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
277	 */
278	object = vm_object_allocate(OBJT_DEFAULT, npages);
279	buffer = (caddr_t) vm_map_min(kernel_map);
280
281	/*
282	 * Insert the object into the kernel map, and allocate kva for it.
283	 * The map entry is, by default, pageable.
284	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
285	 */
286	error = vm_map_find(kernel_map, object, 0,
287		(vm_offset_t *) &buffer, size, 1,
288		VM_PROT_ALL, VM_PROT_ALL, 0);
289
290	if (error != KERN_SUCCESS) {
291		vm_object_deallocate(object);
292		return (ENOMEM);
293	}
294
295	/* free old resources if we're resizing */
296	pipe_free_kmem(cpipe);
297	cpipe->pipe_buffer.object = object;
298	cpipe->pipe_buffer.buffer = buffer;
299	cpipe->pipe_buffer.size = size;
300	cpipe->pipe_buffer.in = 0;
301	cpipe->pipe_buffer.out = 0;
302	cpipe->pipe_buffer.cnt = 0;
303	amountpipekva += cpipe->pipe_buffer.size;
304	return (0);
305}
306
307/*
308 * initialize and allocate VM and memory for pipe
309 */
310static int
311pipe_create(cpipep)
312	struct pipe **cpipep;
313{
314	struct pipe *cpipe;
315	int error;
316
317	*cpipep = zalloc(pipe_zone);
318	if (*cpipep == NULL)
319		return (ENOMEM);
320
321	cpipe = *cpipep;
322
323	/* so pipespace()->pipe_free_kmem() doesn't follow junk pointer */
324	cpipe->pipe_buffer.object = NULL;
325#ifndef PIPE_NODIRECT
326	cpipe->pipe_map.kva = NULL;
327#endif
328	/*
329	 * protect so pipeclose() doesn't follow a junk pointer
330	 * if pipespace() fails.
331	 */
332	bzero(&cpipe->pipe_sel, sizeof(cpipe->pipe_sel));
333	cpipe->pipe_state = 0;
334	cpipe->pipe_peer = NULL;
335	cpipe->pipe_busy = 0;
336
337#ifndef PIPE_NODIRECT
338	/*
339	 * pipe data structure initializations to support direct pipe I/O
340	 */
341	cpipe->pipe_map.cnt = 0;
342	cpipe->pipe_map.kva = 0;
343	cpipe->pipe_map.pos = 0;
344	cpipe->pipe_map.npages = 0;
345	/* cpipe->pipe_map.ms[] = invalid */
346#endif
347
348	error = pipespace(cpipe, PIPE_SIZE);
349	if (error)
350		return (error);
351
352	vfs_timestamp(&cpipe->pipe_ctime);
353	cpipe->pipe_atime = cpipe->pipe_ctime;
354	cpipe->pipe_mtime = cpipe->pipe_ctime;
355
356	return (0);
357}
358
359
360/*
361 * lock a pipe for I/O, blocking other access
362 */
363static __inline int
364pipelock(cpipe, catch)
365	struct pipe *cpipe;
366	int catch;
367{
368	int error;
369
370	PIPE_LOCK_ASSERT(cpipe, MA_OWNED);
371	while (cpipe->pipe_state & PIPE_LOCKFL) {
372		cpipe->pipe_state |= PIPE_LWANT;
373		error = msleep(cpipe, PIPE_MTX(cpipe),
374		    catch ? (PRIBIO | PCATCH) : PRIBIO,
375		    "pipelk", 0);
376		if (error != 0)
377			return (error);
378	}
379	cpipe->pipe_state |= PIPE_LOCKFL;
380	return (0);
381}
382
383/*
384 * unlock a pipe I/O lock
385 */
386static __inline void
387pipeunlock(cpipe)
388	struct pipe *cpipe;
389{
390
391	PIPE_LOCK_ASSERT(cpipe, MA_OWNED);
392	cpipe->pipe_state &= ~PIPE_LOCKFL;
393	if (cpipe->pipe_state & PIPE_LWANT) {
394		cpipe->pipe_state &= ~PIPE_LWANT;
395		wakeup(cpipe);
396	}
397}
398
399static __inline void
400pipeselwakeup(cpipe)
401	struct pipe *cpipe;
402{
403
404	if (cpipe->pipe_state & PIPE_SEL) {
405		cpipe->pipe_state &= ~PIPE_SEL;
406		selwakeup(&cpipe->pipe_sel);
407	}
408	if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio)
409		pgsigio(cpipe->pipe_sigio, SIGIO, 0);
410	KNOTE(&cpipe->pipe_sel.si_note, 0);
411}
412
413/* ARGSUSED */
414static int
415pipe_read(fp, uio, cred, flags, td)
416	struct file *fp;
417	struct uio *uio;
418	struct ucred *cred;
419	struct thread *td;
420	int flags;
421{
422	struct pipe *rpipe = (struct pipe *) fp->f_data;
423	int error;
424	int nread = 0;
425	u_int size;
426
427	PIPE_LOCK(rpipe);
428	++rpipe->pipe_busy;
429	error = pipelock(rpipe, 1);
430	if (error)
431		goto unlocked_error;
432
433	while (uio->uio_resid) {
434		/*
435		 * normal pipe buffer receive
436		 */
437		if (rpipe->pipe_buffer.cnt > 0) {
438			size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
439			if (size > rpipe->pipe_buffer.cnt)
440				size = rpipe->pipe_buffer.cnt;
441			if (size > (u_int) uio->uio_resid)
442				size = (u_int) uio->uio_resid;
443
444			PIPE_UNLOCK(rpipe);
445			error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
446					size, uio);
447			PIPE_LOCK(rpipe);
448			if (error)
449				break;
450
451			rpipe->pipe_buffer.out += size;
452			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
453				rpipe->pipe_buffer.out = 0;
454
455			rpipe->pipe_buffer.cnt -= size;
456
457			/*
458			 * If there is no more to read in the pipe, reset
459			 * its pointers to the beginning.  This improves
460			 * cache hit stats.
461			 */
462			if (rpipe->pipe_buffer.cnt == 0) {
463				rpipe->pipe_buffer.in = 0;
464				rpipe->pipe_buffer.out = 0;
465			}
466			nread += size;
467#ifndef PIPE_NODIRECT
468		/*
469		 * Direct copy, bypassing a kernel buffer.
470		 */
471		} else if ((size = rpipe->pipe_map.cnt) &&
472			   (rpipe->pipe_state & PIPE_DIRECTW)) {
473			caddr_t	va;
474			if (size > (u_int) uio->uio_resid)
475				size = (u_int) uio->uio_resid;
476
477			va = (caddr_t) rpipe->pipe_map.kva +
478			    rpipe->pipe_map.pos;
479			PIPE_UNLOCK(rpipe);
480			error = uiomove(va, size, uio);
481			PIPE_LOCK(rpipe);
482			if (error)
483				break;
484			nread += size;
485			rpipe->pipe_map.pos += size;
486			rpipe->pipe_map.cnt -= size;
487			if (rpipe->pipe_map.cnt == 0) {
488				rpipe->pipe_state &= ~PIPE_DIRECTW;
489				wakeup(rpipe);
490			}
491#endif
492		} else {
493			/*
494			 * detect EOF condition
495			 * read returns 0 on EOF, no need to set error
496			 */
497			if (rpipe->pipe_state & PIPE_EOF)
498				break;
499
500			/*
501			 * If the "write-side" has been blocked, wake it up now.
502			 */
503			if (rpipe->pipe_state & PIPE_WANTW) {
504				rpipe->pipe_state &= ~PIPE_WANTW;
505				wakeup(rpipe);
506			}
507
508			/*
509			 * Break if some data was read.
510			 */
511			if (nread > 0)
512				break;
513
514			/*
515			 * Unlock the pipe buffer for our remaining processing.  We
516			 * will either break out with an error or we will sleep and
517			 * relock to loop.
518			 */
519			pipeunlock(rpipe);
520
521			/*
522			 * Handle non-blocking mode operation or
523			 * wait for more data.
524			 */
525			if (fp->f_flag & FNONBLOCK) {
526				error = EAGAIN;
527			} else {
528				rpipe->pipe_state |= PIPE_WANTR;
529				if ((error = msleep(rpipe, PIPE_MTX(rpipe),
530				    PRIBIO | PCATCH,
531				    "piperd", 0)) == 0)
532					error = pipelock(rpipe, 1);
533			}
534			if (error)
535				goto unlocked_error;
536		}
537	}
538	pipeunlock(rpipe);
539
540	/* XXX: should probably do this before getting any locks. */
541	if (error == 0)
542		vfs_timestamp(&rpipe->pipe_atime);
543unlocked_error:
544	--rpipe->pipe_busy;
545
546	/*
547	 * PIPE_WANT processing only makes sense if pipe_busy is 0.
548	 */
549	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
550		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
551		wakeup(rpipe);
552	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
553		/*
554		 * Handle write blocking hysteresis.
555		 */
556		if (rpipe->pipe_state & PIPE_WANTW) {
557			rpipe->pipe_state &= ~PIPE_WANTW;
558			wakeup(rpipe);
559		}
560	}
561
562	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
563		pipeselwakeup(rpipe);
564
565	PIPE_UNLOCK(rpipe);
566	return (error);
567}
568
569#ifndef PIPE_NODIRECT
570/*
571 * Map the sending processes' buffer into kernel space and wire it.
572 * This is similar to a physical write operation.
573 */
574static int
575pipe_build_write_buffer(wpipe, uio)
576	struct pipe *wpipe;
577	struct uio *uio;
578{
579	u_int size;
580	int i;
581	vm_offset_t addr, endaddr, paddr;
582
583	GIANT_REQUIRED;
584
585	size = (u_int) uio->uio_iov->iov_len;
586	if (size > wpipe->pipe_buffer.size)
587		size = wpipe->pipe_buffer.size;
588
589	endaddr = round_page((vm_offset_t)uio->uio_iov->iov_base + size);
590	addr = trunc_page((vm_offset_t)uio->uio_iov->iov_base);
591	for (i = 0; addr < endaddr; addr += PAGE_SIZE, i++) {
592		vm_page_t m;
593
594		if (vm_fault_quick((caddr_t)addr, VM_PROT_READ) < 0 ||
595		    (paddr = pmap_kextract(addr)) == 0) {
596			int j;
597
598			for (j = 0; j < i; j++)
599				vm_page_unwire(wpipe->pipe_map.ms[j], 1);
600			return (EFAULT);
601		}
602
603		m = PHYS_TO_VM_PAGE(paddr);
604		vm_page_wire(m);
605		wpipe->pipe_map.ms[i] = m;
606	}
607
608/*
609 * set up the control block
610 */
611	wpipe->pipe_map.npages = i;
612	wpipe->pipe_map.pos =
613	    ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
614	wpipe->pipe_map.cnt = size;
615
616/*
617 * and map the buffer
618 */
619	if (wpipe->pipe_map.kva == 0) {
620		/*
621		 * We need to allocate space for an extra page because the
622		 * address range might (will) span pages at times.
623		 */
624		wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
625			wpipe->pipe_buffer.size + PAGE_SIZE);
626		amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE;
627	}
628	pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
629		wpipe->pipe_map.npages);
630
631/*
632 * and update the uio data
633 */
634
635	uio->uio_iov->iov_len -= size;
636	uio->uio_iov->iov_base += size;
637	if (uio->uio_iov->iov_len == 0)
638		uio->uio_iov++;
639	uio->uio_resid -= size;
640	uio->uio_offset += size;
641	return (0);
642}
643
644/*
645 * unmap and unwire the process buffer
646 */
647static void
648pipe_destroy_write_buffer(wpipe)
649	struct pipe *wpipe;
650{
651	int i;
652
653	GIANT_REQUIRED;
654
655	if (wpipe->pipe_map.kva) {
656		pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
657
658		if (amountpipekva > MAXPIPEKVA) {
659			vm_offset_t kva = wpipe->pipe_map.kva;
660			wpipe->pipe_map.kva = 0;
661			kmem_free(kernel_map, kva,
662				wpipe->pipe_buffer.size + PAGE_SIZE);
663			amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
664		}
665	}
666	for (i = 0; i < wpipe->pipe_map.npages; i++)
667		vm_page_unwire(wpipe->pipe_map.ms[i], 1);
668}
669
670/*
671 * In the case of a signal, the writing process might go away.  This
672 * code copies the data into the circular buffer so that the source
673 * pages can be freed without loss of data.
674 */
675static void
676pipe_clone_write_buffer(wpipe)
677	struct pipe *wpipe;
678{
679	int size;
680	int pos;
681
682	PIPE_LOCK_ASSERT(wpipe, MA_OWNED);
683	size = wpipe->pipe_map.cnt;
684	pos = wpipe->pipe_map.pos;
685	bcopy((caddr_t) wpipe->pipe_map.kva + pos,
686	    (caddr_t) wpipe->pipe_buffer.buffer, size);
687
688	wpipe->pipe_buffer.in = size;
689	wpipe->pipe_buffer.out = 0;
690	wpipe->pipe_buffer.cnt = size;
691	wpipe->pipe_state &= ~PIPE_DIRECTW;
692
693	pipe_destroy_write_buffer(wpipe);
694}
695
696/*
697 * This implements the pipe buffer write mechanism.  Note that only
698 * a direct write OR a normal pipe write can be pending at any given time.
699 * If there are any characters in the pipe buffer, the direct write will
700 * be deferred until the receiving process grabs all of the bytes from
701 * the pipe buffer.  Then the direct mapping write is set-up.
702 */
703static int
704pipe_direct_write(wpipe, uio)
705	struct pipe *wpipe;
706	struct uio *uio;
707{
708	int error;
709
710retry:
711	PIPE_LOCK_ASSERT(wpipe, MA_OWNED);
712	while (wpipe->pipe_state & PIPE_DIRECTW) {
713		if (wpipe->pipe_state & PIPE_WANTR) {
714			wpipe->pipe_state &= ~PIPE_WANTR;
715			wakeup(wpipe);
716		}
717		wpipe->pipe_state |= PIPE_WANTW;
718		error = msleep(wpipe, PIPE_MTX(wpipe),
719		    PRIBIO | PCATCH, "pipdww", 0);
720		if (error)
721			goto error1;
722		if (wpipe->pipe_state & PIPE_EOF) {
723			error = EPIPE;
724			goto error1;
725		}
726	}
727	wpipe->pipe_map.cnt = 0;	/* transfer not ready yet */
728	if (wpipe->pipe_buffer.cnt > 0) {
729		if (wpipe->pipe_state & PIPE_WANTR) {
730			wpipe->pipe_state &= ~PIPE_WANTR;
731			wakeup(wpipe);
732		}
733
734		wpipe->pipe_state |= PIPE_WANTW;
735		error = msleep(wpipe, PIPE_MTX(wpipe),
736		    PRIBIO | PCATCH, "pipdwc", 0);
737		if (error)
738			goto error1;
739		if (wpipe->pipe_state & PIPE_EOF) {
740			error = EPIPE;
741			goto error1;
742		}
743		goto retry;
744	}
745
746	wpipe->pipe_state |= PIPE_DIRECTW;
747
748	PIPE_GET_GIANT(wpipe);
749	error = pipe_build_write_buffer(wpipe, uio);
750	PIPE_DROP_GIANT(wpipe);
751	if (error) {
752		wpipe->pipe_state &= ~PIPE_DIRECTW;
753		goto error1;
754	}
755
756	error = 0;
757	while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
758		if (wpipe->pipe_state & PIPE_EOF) {
759			pipelock(wpipe, 0);
760			PIPE_GET_GIANT(wpipe);
761			pipe_destroy_write_buffer(wpipe);
762			PIPE_DROP_GIANT(wpipe);
763			pipeunlock(wpipe);
764			pipeselwakeup(wpipe);
765			error = EPIPE;
766			goto error1;
767		}
768		if (wpipe->pipe_state & PIPE_WANTR) {
769			wpipe->pipe_state &= ~PIPE_WANTR;
770			wakeup(wpipe);
771		}
772		pipeselwakeup(wpipe);
773		error = msleep(wpipe, PIPE_MTX(wpipe), PRIBIO | PCATCH,
774		    "pipdwt", 0);
775	}
776
777	pipelock(wpipe,0);
778	if (wpipe->pipe_state & PIPE_DIRECTW) {
779		/*
780		 * this bit of trickery substitutes a kernel buffer for
781		 * the process that might be going away.
782		 */
783		pipe_clone_write_buffer(wpipe);
784	} else {
785		pipe_destroy_write_buffer(wpipe);
786	}
787	pipeunlock(wpipe);
788	return (error);
789
790error1:
791	wakeup(wpipe);
792	return (error);
793}
794#endif
795
796static int
797pipe_write(fp, uio, cred, flags, td)
798	struct file *fp;
799	struct uio *uio;
800	struct ucred *cred;
801	struct thread *td;
802	int flags;
803{
804	int error = 0;
805	int orig_resid;
806	struct pipe *wpipe, *rpipe;
807
808	rpipe = (struct pipe *) fp->f_data;
809	wpipe = rpipe->pipe_peer;
810
811	PIPE_LOCK(wpipe);
812	/*
813	 * detect loss of pipe read side, issue SIGPIPE if lost.
814	 */
815	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
816		PIPE_UNLOCK(wpipe);
817		return (EPIPE);
818	}
819	++wpipe->pipe_busy;
820
821	/*
822	 * If it is advantageous to resize the pipe buffer, do
823	 * so.
824	 */
825	if ((uio->uio_resid > PIPE_SIZE) &&
826		(nbigpipe < LIMITBIGPIPES) &&
827		(wpipe->pipe_state & PIPE_DIRECTW) == 0 &&
828		(wpipe->pipe_buffer.size <= PIPE_SIZE) &&
829		(wpipe->pipe_buffer.cnt == 0)) {
830
831		if ((error = pipelock(wpipe,1)) == 0) {
832			PIPE_GET_GIANT(wpipe);
833			if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
834				nbigpipe++;
835			PIPE_DROP_GIANT(wpipe);
836			pipeunlock(wpipe);
837		}
838	}
839
840	/*
841	 * If an early error occured unbusy and return, waking up any pending
842	 * readers.
843	 */
844	if (error) {
845		--wpipe->pipe_busy;
846		if ((wpipe->pipe_busy == 0) &&
847		    (wpipe->pipe_state & PIPE_WANT)) {
848			wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
849			wakeup(wpipe);
850		}
851		PIPE_UNLOCK(wpipe);
852		return(error);
853	}
854
855	KASSERT(wpipe->pipe_buffer.buffer != NULL, ("pipe buffer gone"));
856
857	orig_resid = uio->uio_resid;
858
859	while (uio->uio_resid) {
860		int space;
861
862#ifndef PIPE_NODIRECT
863		/*
864		 * If the transfer is large, we can gain performance if
865		 * we do process-to-process copies directly.
866		 * If the write is non-blocking, we don't use the
867		 * direct write mechanism.
868		 *
869		 * The direct write mechanism will detect the reader going
870		 * away on us.
871		 */
872		if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) &&
873		    (fp->f_flag & FNONBLOCK) == 0 &&
874			(wpipe->pipe_map.kva || (amountpipekva < LIMITPIPEKVA)) &&
875			(uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
876			error = pipe_direct_write( wpipe, uio);
877			if (error)
878				break;
879			continue;
880		}
881#endif
882
883		/*
884		 * Pipe buffered writes cannot be coincidental with
885		 * direct writes.  We wait until the currently executing
886		 * direct write is completed before we start filling the
887		 * pipe buffer.  We break out if a signal occurs or the
888		 * reader goes away.
889		 */
890	retrywrite:
891		while (wpipe->pipe_state & PIPE_DIRECTW) {
892			if (wpipe->pipe_state & PIPE_WANTR) {
893				wpipe->pipe_state &= ~PIPE_WANTR;
894				wakeup(wpipe);
895			}
896			error = msleep(wpipe, PIPE_MTX(wpipe), PRIBIO | PCATCH,
897			    "pipbww", 0);
898			if (wpipe->pipe_state & PIPE_EOF)
899				break;
900			if (error)
901				break;
902		}
903		if (wpipe->pipe_state & PIPE_EOF) {
904			error = EPIPE;
905			break;
906		}
907
908		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
909
910		/* Writes of size <= PIPE_BUF must be atomic. */
911		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
912			space = 0;
913
914		if (space > 0 && (wpipe->pipe_buffer.cnt < PIPE_SIZE)) {
915			if ((error = pipelock(wpipe,1)) == 0) {
916				int size;	/* Transfer size */
917				int segsize;	/* first segment to transfer */
918
919				/*
920				 * It is possible for a direct write to
921				 * slip in on us... handle it here...
922				 */
923				if (wpipe->pipe_state & PIPE_DIRECTW) {
924					pipeunlock(wpipe);
925					goto retrywrite;
926				}
927				/*
928				 * If a process blocked in uiomove, our
929				 * value for space might be bad.
930				 *
931				 * XXX will we be ok if the reader has gone
932				 * away here?
933				 */
934				if (space > wpipe->pipe_buffer.size -
935				    wpipe->pipe_buffer.cnt) {
936					pipeunlock(wpipe);
937					goto retrywrite;
938				}
939
940				/*
941				 * Transfer size is minimum of uio transfer
942				 * and free space in pipe buffer.
943				 */
944				if (space > uio->uio_resid)
945					size = uio->uio_resid;
946				else
947					size = space;
948				/*
949				 * First segment to transfer is minimum of
950				 * transfer size and contiguous space in
951				 * pipe buffer.  If first segment to transfer
952				 * is less than the transfer size, we've got
953				 * a wraparound in the buffer.
954				 */
955				segsize = wpipe->pipe_buffer.size -
956					wpipe->pipe_buffer.in;
957				if (segsize > size)
958					segsize = size;
959
960				/* Transfer first segment */
961
962				PIPE_UNLOCK(wpipe);
963				error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
964						segsize, uio);
965				PIPE_LOCK(wpipe);
966
967				if (error == 0 && segsize < size) {
968					/*
969					 * Transfer remaining part now, to
970					 * support atomic writes.  Wraparound
971					 * happened.
972					 */
973					if (wpipe->pipe_buffer.in + segsize !=
974					    wpipe->pipe_buffer.size)
975						panic("Expected pipe buffer wraparound disappeared");
976
977					PIPE_UNLOCK(wpipe);
978					error = uiomove(&wpipe->pipe_buffer.buffer[0],
979							size - segsize, uio);
980					PIPE_LOCK(wpipe);
981				}
982				if (error == 0) {
983					wpipe->pipe_buffer.in += size;
984					if (wpipe->pipe_buffer.in >=
985					    wpipe->pipe_buffer.size) {
986						if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size)
987							panic("Expected wraparound bad");
988						wpipe->pipe_buffer.in = size - segsize;
989					}
990
991					wpipe->pipe_buffer.cnt += size;
992					if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size)
993						panic("Pipe buffer overflow");
994
995				}
996				pipeunlock(wpipe);
997			}
998			if (error)
999				break;
1000
1001		} else {
1002			/*
1003			 * If the "read-side" has been blocked, wake it up now.
1004			 */
1005			if (wpipe->pipe_state & PIPE_WANTR) {
1006				wpipe->pipe_state &= ~PIPE_WANTR;
1007				wakeup(wpipe);
1008			}
1009
1010			/*
1011			 * don't block on non-blocking I/O
1012			 */
1013			if (fp->f_flag & FNONBLOCK) {
1014				error = EAGAIN;
1015				break;
1016			}
1017
1018			/*
1019			 * We have no more space and have something to offer,
1020			 * wake up select/poll.
1021			 */
1022			pipeselwakeup(wpipe);
1023
1024			wpipe->pipe_state |= PIPE_WANTW;
1025			error = msleep(wpipe, PIPE_MTX(wpipe),
1026			    PRIBIO | PCATCH, "pipewr", 0);
1027			if (error != 0)
1028				break;
1029			/*
1030			 * If read side wants to go away, we just issue a signal
1031			 * to ourselves.
1032			 */
1033			if (wpipe->pipe_state & PIPE_EOF) {
1034				error = EPIPE;
1035				break;
1036			}
1037		}
1038	}
1039
1040	--wpipe->pipe_busy;
1041
1042	if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) {
1043		wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
1044		wakeup(wpipe);
1045	} else if (wpipe->pipe_buffer.cnt > 0) {
1046		/*
1047		 * If we have put any characters in the buffer, we wake up
1048		 * the reader.
1049		 */
1050		if (wpipe->pipe_state & PIPE_WANTR) {
1051			wpipe->pipe_state &= ~PIPE_WANTR;
1052			wakeup(wpipe);
1053		}
1054	}
1055
1056	/*
1057	 * Don't return EPIPE if I/O was successful
1058	 */
1059	if ((wpipe->pipe_buffer.cnt == 0) &&
1060	    (uio->uio_resid == 0) &&
1061	    (error == EPIPE)) {
1062		error = 0;
1063	}
1064
1065	if (error == 0)
1066		vfs_timestamp(&wpipe->pipe_mtime);
1067
1068	/*
1069	 * We have something to offer,
1070	 * wake up select/poll.
1071	 */
1072	if (wpipe->pipe_buffer.cnt)
1073		pipeselwakeup(wpipe);
1074
1075	PIPE_UNLOCK(wpipe);
1076	return (error);
1077}
1078
1079/*
1080 * we implement a very minimal set of ioctls for compatibility with sockets.
1081 */
1082int
1083pipe_ioctl(fp, cmd, data, td)
1084	struct file *fp;
1085	u_long cmd;
1086	caddr_t data;
1087	struct thread *td;
1088{
1089	struct pipe *mpipe = (struct pipe *)fp->f_data;
1090
1091	switch (cmd) {
1092
1093	case FIONBIO:
1094		return (0);
1095
1096	case FIOASYNC:
1097		PIPE_LOCK(mpipe);
1098		if (*(int *)data) {
1099			mpipe->pipe_state |= PIPE_ASYNC;
1100		} else {
1101			mpipe->pipe_state &= ~PIPE_ASYNC;
1102		}
1103		PIPE_UNLOCK(mpipe);
1104		return (0);
1105
1106	case FIONREAD:
1107		PIPE_LOCK(mpipe);
1108		if (mpipe->pipe_state & PIPE_DIRECTW)
1109			*(int *)data = mpipe->pipe_map.cnt;
1110		else
1111			*(int *)data = mpipe->pipe_buffer.cnt;
1112		PIPE_UNLOCK(mpipe);
1113		return (0);
1114
1115	case FIOSETOWN:
1116		return (fsetown(*(int *)data, &mpipe->pipe_sigio));
1117
1118	case FIOGETOWN:
1119		*(int *)data = fgetown(mpipe->pipe_sigio);
1120		return (0);
1121
1122	/* This is deprecated, FIOSETOWN should be used instead. */
1123	case TIOCSPGRP:
1124		return (fsetown(-(*(int *)data), &mpipe->pipe_sigio));
1125
1126	/* This is deprecated, FIOGETOWN should be used instead. */
1127	case TIOCGPGRP:
1128		*(int *)data = -fgetown(mpipe->pipe_sigio);
1129		return (0);
1130
1131	}
1132	return (ENOTTY);
1133}
1134
1135int
1136pipe_poll(fp, events, cred, td)
1137	struct file *fp;
1138	int events;
1139	struct ucred *cred;
1140	struct thread *td;
1141{
1142	struct pipe *rpipe = (struct pipe *)fp->f_data;
1143	struct pipe *wpipe;
1144	int revents = 0;
1145
1146	wpipe = rpipe->pipe_peer;
1147	PIPE_LOCK(rpipe);
1148	if (events & (POLLIN | POLLRDNORM))
1149		if ((rpipe->pipe_state & PIPE_DIRECTW) ||
1150		    (rpipe->pipe_buffer.cnt > 0) ||
1151		    (rpipe->pipe_state & PIPE_EOF))
1152			revents |= events & (POLLIN | POLLRDNORM);
1153
1154	if (events & (POLLOUT | POLLWRNORM))
1155		if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF) ||
1156		    (((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
1157		     (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF))
1158			revents |= events & (POLLOUT | POLLWRNORM);
1159
1160	if ((rpipe->pipe_state & PIPE_EOF) ||
1161	    (wpipe == NULL) ||
1162	    (wpipe->pipe_state & PIPE_EOF))
1163		revents |= POLLHUP;
1164
1165	if (revents == 0) {
1166		if (events & (POLLIN | POLLRDNORM)) {
1167			selrecord(td, &rpipe->pipe_sel);
1168			rpipe->pipe_state |= PIPE_SEL;
1169		}
1170
1171		if (events & (POLLOUT | POLLWRNORM)) {
1172			selrecord(td, &wpipe->pipe_sel);
1173			wpipe->pipe_state |= PIPE_SEL;
1174		}
1175	}
1176	PIPE_UNLOCK(rpipe);
1177
1178	return (revents);
1179}
1180
1181static int
1182pipe_stat(fp, ub, td)
1183	struct file *fp;
1184	struct stat *ub;
1185	struct thread *td;
1186{
1187	struct pipe *pipe = (struct pipe *)fp->f_data;
1188
1189	bzero((caddr_t)ub, sizeof(*ub));
1190	ub->st_mode = S_IFIFO;
1191	ub->st_blksize = pipe->pipe_buffer.size;
1192	ub->st_size = pipe->pipe_buffer.cnt;
1193	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1194	ub->st_atimespec = pipe->pipe_atime;
1195	ub->st_mtimespec = pipe->pipe_mtime;
1196	ub->st_ctimespec = pipe->pipe_ctime;
1197	ub->st_uid = fp->f_cred->cr_uid;
1198	ub->st_gid = fp->f_cred->cr_gid;
1199	/*
1200	 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen.
1201	 * XXX (st_dev, st_ino) should be unique.
1202	 */
1203	return (0);
1204}
1205
1206/* ARGSUSED */
1207static int
1208pipe_close(fp, td)
1209	struct file *fp;
1210	struct thread *td;
1211{
1212	struct pipe *cpipe = (struct pipe *)fp->f_data;
1213
1214	fp->f_ops = &badfileops;
1215	fp->f_data = NULL;
1216	funsetown(cpipe->pipe_sigio);
1217	pipeclose(cpipe);
1218	return (0);
1219}
1220
1221static void
1222pipe_free_kmem(cpipe)
1223	struct pipe *cpipe;
1224{
1225	GIANT_REQUIRED;
1226
1227	if (cpipe->pipe_buffer.buffer != NULL) {
1228		if (cpipe->pipe_buffer.size > PIPE_SIZE)
1229			--nbigpipe;
1230		amountpipekva -= cpipe->pipe_buffer.size;
1231		kmem_free(kernel_map,
1232			(vm_offset_t)cpipe->pipe_buffer.buffer,
1233			cpipe->pipe_buffer.size);
1234		cpipe->pipe_buffer.buffer = NULL;
1235	}
1236#ifndef PIPE_NODIRECT
1237	if (cpipe->pipe_map.kva != NULL) {
1238		amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE;
1239		kmem_free(kernel_map,
1240			cpipe->pipe_map.kva,
1241			cpipe->pipe_buffer.size + PAGE_SIZE);
1242		cpipe->pipe_map.cnt = 0;
1243		cpipe->pipe_map.kva = 0;
1244		cpipe->pipe_map.pos = 0;
1245		cpipe->pipe_map.npages = 0;
1246	}
1247#endif
1248}
1249
1250/*
1251 * shutdown the pipe
1252 */
1253static void
1254pipeclose(cpipe)
1255	struct pipe *cpipe;
1256{
1257	struct pipe *ppipe;
1258
1259	if (cpipe) {
1260		PIPE_LOCK(cpipe);
1261
1262		pipeselwakeup(cpipe);
1263
1264		/*
1265		 * If the other side is blocked, wake it up saying that
1266		 * we want to close it down.
1267		 */
1268		while (cpipe->pipe_busy) {
1269			wakeup(cpipe);
1270			cpipe->pipe_state |= PIPE_WANT | PIPE_EOF;
1271			msleep(cpipe, PIPE_MTX(cpipe), PRIBIO, "pipecl", 0);
1272		}
1273
1274		/*
1275		 * Disconnect from peer
1276		 */
1277		if ((ppipe = cpipe->pipe_peer) != NULL) {
1278			pipeselwakeup(ppipe);
1279
1280			ppipe->pipe_state |= PIPE_EOF;
1281			wakeup(ppipe);
1282			KNOTE(&ppipe->pipe_sel.si_note, 0);
1283			ppipe->pipe_peer = NULL;
1284		}
1285		/*
1286		 * free resources
1287		 */
1288		PIPE_UNLOCK(cpipe);
1289		mtx_lock(&Giant);
1290		pipe_free_kmem(cpipe);
1291		zfree(pipe_zone, cpipe);
1292		mtx_unlock(&Giant);
1293	}
1294}
1295
1296/*ARGSUSED*/
1297static int
1298pipe_kqfilter(struct file *fp, struct knote *kn)
1299{
1300	struct pipe *cpipe;
1301
1302	cpipe = (struct pipe *)kn->kn_fp->f_data;
1303	switch (kn->kn_filter) {
1304	case EVFILT_READ:
1305		kn->kn_fop = &pipe_rfiltops;
1306		break;
1307	case EVFILT_WRITE:
1308		kn->kn_fop = &pipe_wfiltops;
1309		cpipe = cpipe->pipe_peer;
1310		break;
1311	default:
1312		return (1);
1313	}
1314	kn->kn_hook = (caddr_t)cpipe;
1315
1316	SLIST_INSERT_HEAD(&cpipe->pipe_sel.si_note, kn, kn_selnext);
1317	return (0);
1318}
1319
1320static void
1321filt_pipedetach(struct knote *kn)
1322{
1323	struct pipe *cpipe = (struct pipe *)kn->kn_hook;
1324
1325	SLIST_REMOVE(&cpipe->pipe_sel.si_note, kn, knote, kn_selnext);
1326}
1327
1328/*ARGSUSED*/
1329static int
1330filt_piperead(struct knote *kn, long hint)
1331{
1332	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1333	struct pipe *wpipe = rpipe->pipe_peer;
1334
1335	kn->kn_data = rpipe->pipe_buffer.cnt;
1336	if ((kn->kn_data == 0) && (rpipe->pipe_state & PIPE_DIRECTW))
1337		kn->kn_data = rpipe->pipe_map.cnt;
1338
1339	if ((rpipe->pipe_state & PIPE_EOF) ||
1340	    (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1341		kn->kn_flags |= EV_EOF;
1342		return (1);
1343	}
1344	return (kn->kn_data > 0);
1345}
1346
1347/*ARGSUSED*/
1348static int
1349filt_pipewrite(struct knote *kn, long hint)
1350{
1351	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1352	struct pipe *wpipe = rpipe->pipe_peer;
1353
1354	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1355		kn->kn_data = 0;
1356		kn->kn_flags |= EV_EOF;
1357		return (1);
1358	}
1359	kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
1360	if (wpipe->pipe_state & PIPE_DIRECTW)
1361		kn->kn_data = 0;
1362
1363	return (kn->kn_data >= PIPE_BUF);
1364}
1365