sys_pipe.c revision 77035
1/*
2 * Copyright (c) 1996 John S. Dyson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice immediately at the beginning of the file, without modification,
10 *    this list of conditions, and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 * 3. Absolutely no warranty of function or purpose is made by the author
15 *    John S. Dyson.
16 * 4. Modifications may be freely made to this file if the above conditions
17 *    are met.
18 *
19 * $FreeBSD: head/sys/kern/sys_pipe.c 77035 2001-05-23 10:26:36Z alfred $
20 */
21
22/*
23 * This file contains a high-performance replacement for the socket-based
24 * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
25 * all features of sockets, but does do everything that pipes normally
26 * do.
27 */
28
29/*
30 * This code has two modes of operation, a small write mode and a large
31 * write mode.  The small write mode acts like conventional pipes with
32 * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
33 * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
34 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
35 * the receiving process can copy it directly from the pages in the sending
36 * process.
37 *
38 * If the sending process receives a signal, it is possible that it will
39 * go away, and certainly its address space can change, because control
40 * is returned back to the user-mode side.  In that case, the pipe code
41 * arranges to copy the buffer supplied by the user process, to a pageable
42 * kernel buffer, and the receiving process will grab the data from the
43 * pageable kernel buffer.  Since signals don't happen all that often,
44 * the copy operation is normally eliminated.
45 *
46 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
47 * happen for small transfers so that the system will not spend all of
48 * its time context switching.  PIPE_SIZE is constrained by the
49 * amount of kernel virtual memory.
50 */
51
52#include <sys/param.h>
53#include <sys/systm.h>
54#include <sys/fcntl.h>
55#include <sys/file.h>
56#include <sys/filedesc.h>
57#include <sys/filio.h>
58#include <sys/lock.h>
59#include <sys/mutex.h>
60#include <sys/ttycom.h>
61#include <sys/stat.h>
62#include <sys/poll.h>
63#include <sys/selinfo.h>
64#include <sys/signalvar.h>
65#include <sys/sysproto.h>
66#include <sys/pipe.h>
67#include <sys/proc.h>
68#include <sys/vnode.h>
69#include <sys/uio.h>
70#include <sys/event.h>
71
72#include <vm/vm.h>
73#include <vm/vm_param.h>
74#include <vm/vm_object.h>
75#include <vm/vm_kern.h>
76#include <vm/vm_extern.h>
77#include <vm/pmap.h>
78#include <vm/vm_map.h>
79#include <vm/vm_page.h>
80#include <vm/vm_zone.h>
81
82/*
83 * Use this define if you want to disable *fancy* VM things.  Expect an
84 * approx 30% decrease in transfer rate.  This could be useful for
85 * NetBSD or OpenBSD.
86 */
87/* #define PIPE_NODIRECT */
88
89/*
90 * interfaces to the outside world
91 */
92static int pipe_read __P((struct file *fp, struct uio *uio,
93		struct ucred *cred, int flags, struct proc *p));
94static int pipe_write __P((struct file *fp, struct uio *uio,
95		struct ucred *cred, int flags, struct proc *p));
96static int pipe_close __P((struct file *fp, struct proc *p));
97static int pipe_poll __P((struct file *fp, int events, struct ucred *cred,
98		struct proc *p));
99static int pipe_kqfilter __P((struct file *fp, struct knote *kn));
100static int pipe_stat __P((struct file *fp, struct stat *sb, struct proc *p));
101static int pipe_ioctl __P((struct file *fp, u_long cmd, caddr_t data, struct proc *p));
102
103static struct fileops pipeops = {
104	pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter,
105	pipe_stat, pipe_close
106};
107
108static void	filt_pipedetach(struct knote *kn);
109static int	filt_piperead(struct knote *kn, long hint);
110static int	filt_pipewrite(struct knote *kn, long hint);
111
112static struct filterops pipe_rfiltops =
113	{ 1, NULL, filt_pipedetach, filt_piperead };
114static struct filterops pipe_wfiltops =
115	{ 1, NULL, filt_pipedetach, filt_pipewrite };
116
117
118/*
119 * Default pipe buffer size(s), this can be kind-of large now because pipe
120 * space is pageable.  The pipe code will try to maintain locality of
121 * reference for performance reasons, so small amounts of outstanding I/O
122 * will not wipe the cache.
123 */
124#define MINPIPESIZE (PIPE_SIZE/3)
125#define MAXPIPESIZE (2*PIPE_SIZE/3)
126
127/*
128 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
129 * is there so that on large systems, we don't exhaust it.
130 */
131#define MAXPIPEKVA (8*1024*1024)
132
133/*
134 * Limit for direct transfers, we cannot, of course limit
135 * the amount of kva for pipes in general though.
136 */
137#define LIMITPIPEKVA (16*1024*1024)
138
139/*
140 * Limit the number of "big" pipes
141 */
142#define LIMITBIGPIPES	32
143static int nbigpipe;
144
145static int amountpipekva;
146
147static void pipeclose __P((struct pipe *cpipe));
148static void pipe_free_kmem __P((struct pipe *cpipe));
149static int pipe_create __P((struct pipe **cpipep));
150static __inline int pipelock __P((struct pipe *cpipe, int catch));
151static __inline void pipeunlock __P((struct pipe *cpipe));
152static __inline void pipeselwakeup __P((struct pipe *cpipe));
153#ifndef PIPE_NODIRECT
154static int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio));
155static void pipe_destroy_write_buffer __P((struct pipe *wpipe));
156static int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio));
157static void pipe_clone_write_buffer __P((struct pipe *wpipe));
158#endif
159static int pipespace __P((struct pipe *cpipe, int size));
160
161static vm_zone_t pipe_zone;
162
163/*
164 * The pipe system call for the DTYPE_PIPE type of pipes
165 */
166
167/* ARGSUSED */
168int
169pipe(p, uap)
170	struct proc *p;
171	struct pipe_args /* {
172		int	dummy;
173	} */ *uap;
174{
175	struct filedesc *fdp = p->p_fd;
176	struct file *rf, *wf;
177	struct pipe *rpipe, *wpipe;
178	int fd, error;
179
180	if (pipe_zone == NULL)
181		pipe_zone = zinit("PIPE", sizeof(struct pipe), 0, 0, 4);
182
183	rpipe = wpipe = NULL;
184	if (pipe_create(&rpipe) || pipe_create(&wpipe)) {
185		pipeclose(rpipe);
186		pipeclose(wpipe);
187		return (ENFILE);
188	}
189
190	rpipe->pipe_state |= PIPE_DIRECTOK;
191	wpipe->pipe_state |= PIPE_DIRECTOK;
192
193	error = falloc(p, &rf, &fd);
194	if (error) {
195		pipeclose(rpipe);
196		pipeclose(wpipe);
197		return (error);
198	}
199	fhold(rf);
200	p->p_retval[0] = fd;
201
202	/*
203	 * Warning: once we've gotten past allocation of the fd for the
204	 * read-side, we can only drop the read side via fdrop() in order
205	 * to avoid races against processes which manage to dup() the read
206	 * side while we are blocked trying to allocate the write side.
207	 */
208	rf->f_flag = FREAD | FWRITE;
209	rf->f_type = DTYPE_PIPE;
210	rf->f_data = (caddr_t)rpipe;
211	rf->f_ops = &pipeops;
212	error = falloc(p, &wf, &fd);
213	if (error) {
214		if (fdp->fd_ofiles[p->p_retval[0]] == rf) {
215			fdp->fd_ofiles[p->p_retval[0]] = NULL;
216			fdrop(rf, p);
217		}
218		fdrop(rf, p);
219		/* rpipe has been closed by fdrop(). */
220		pipeclose(wpipe);
221		return (error);
222	}
223	wf->f_flag = FREAD | FWRITE;
224	wf->f_type = DTYPE_PIPE;
225	wf->f_data = (caddr_t)wpipe;
226	wf->f_ops = &pipeops;
227	p->p_retval[1] = fd;
228
229	rpipe->pipe_peer = wpipe;
230	wpipe->pipe_peer = rpipe;
231	fdrop(rf, p);
232
233	return (0);
234}
235
236/*
237 * Allocate kva for pipe circular buffer, the space is pageable
238 * This routine will 'realloc' the size of a pipe safely, if it fails
239 * it will retain the old buffer.
240 * If it fails it will return ENOMEM.
241 */
242static int
243pipespace(cpipe, size)
244	struct pipe *cpipe;
245	int size;
246{
247	struct vm_object *object;
248	caddr_t buffer;
249	int npages, error;
250
251	npages = round_page(size)/PAGE_SIZE;
252	/*
253	 * Create an object, I don't like the idea of paging to/from
254	 * kernel_object.
255	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
256	 */
257	mtx_lock(&vm_mtx);
258	object = vm_object_allocate(OBJT_DEFAULT, npages);
259	buffer = (caddr_t) vm_map_min(kernel_map);
260
261	/*
262	 * Insert the object into the kernel map, and allocate kva for it.
263	 * The map entry is, by default, pageable.
264	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
265	 */
266	error = vm_map_find(kernel_map, object, 0,
267		(vm_offset_t *) &buffer, size, 1,
268		VM_PROT_ALL, VM_PROT_ALL, 0);
269
270	if (error != KERN_SUCCESS) {
271		vm_object_deallocate(object);
272		mtx_unlock(&vm_mtx);
273		return (ENOMEM);
274	}
275
276	/* free old resources if we're resizing */
277	pipe_free_kmem(cpipe);
278	mtx_unlock(&vm_mtx);
279	cpipe->pipe_buffer.object = object;
280	cpipe->pipe_buffer.buffer = buffer;
281	cpipe->pipe_buffer.size = size;
282	cpipe->pipe_buffer.in = 0;
283	cpipe->pipe_buffer.out = 0;
284	cpipe->pipe_buffer.cnt = 0;
285	amountpipekva += cpipe->pipe_buffer.size;
286	return (0);
287}
288
289/*
290 * initialize and allocate VM and memory for pipe
291 */
292static int
293pipe_create(cpipep)
294	struct pipe **cpipep;
295{
296	struct pipe *cpipe;
297	int error;
298
299	*cpipep = zalloc(pipe_zone);
300	if (*cpipep == NULL)
301		return (ENOMEM);
302
303	cpipe = *cpipep;
304
305	/* so pipespace()->pipe_free_kmem() doesn't follow junk pointer */
306	cpipe->pipe_buffer.object = NULL;
307#ifndef PIPE_NODIRECT
308	cpipe->pipe_map.kva = NULL;
309#endif
310	/*
311	 * protect so pipeclose() doesn't follow a junk pointer
312	 * if pipespace() fails.
313	 */
314	bzero(&cpipe->pipe_sel, sizeof(cpipe->pipe_sel));
315	cpipe->pipe_state = 0;
316	cpipe->pipe_peer = NULL;
317	cpipe->pipe_busy = 0;
318
319#ifndef PIPE_NODIRECT
320	/*
321	 * pipe data structure initializations to support direct pipe I/O
322	 */
323	cpipe->pipe_map.cnt = 0;
324	cpipe->pipe_map.kva = 0;
325	cpipe->pipe_map.pos = 0;
326	cpipe->pipe_map.npages = 0;
327	/* cpipe->pipe_map.ms[] = invalid */
328#endif
329
330	error = pipespace(cpipe, PIPE_SIZE);
331	if (error)
332		return (error);
333
334	vfs_timestamp(&cpipe->pipe_ctime);
335	cpipe->pipe_atime = cpipe->pipe_ctime;
336	cpipe->pipe_mtime = cpipe->pipe_ctime;
337
338	return (0);
339}
340
341
342/*
343 * lock a pipe for I/O, blocking other access
344 */
345static __inline int
346pipelock(cpipe, catch)
347	struct pipe *cpipe;
348	int catch;
349{
350	int error;
351
352	while (cpipe->pipe_state & PIPE_LOCK) {
353		cpipe->pipe_state |= PIPE_LWANT;
354		error = tsleep(cpipe, catch ? (PRIBIO | PCATCH) : PRIBIO,
355		    "pipelk", 0);
356		if (error != 0)
357			return (error);
358	}
359	cpipe->pipe_state |= PIPE_LOCK;
360	return (0);
361}
362
363/*
364 * unlock a pipe I/O lock
365 */
366static __inline void
367pipeunlock(cpipe)
368	struct pipe *cpipe;
369{
370
371	cpipe->pipe_state &= ~PIPE_LOCK;
372	if (cpipe->pipe_state & PIPE_LWANT) {
373		cpipe->pipe_state &= ~PIPE_LWANT;
374		wakeup(cpipe);
375	}
376}
377
378static __inline void
379pipeselwakeup(cpipe)
380	struct pipe *cpipe;
381{
382
383	if (cpipe->pipe_state & PIPE_SEL) {
384		cpipe->pipe_state &= ~PIPE_SEL;
385		selwakeup(&cpipe->pipe_sel);
386	}
387	if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio)
388		pgsigio(cpipe->pipe_sigio, SIGIO, 0);
389	KNOTE(&cpipe->pipe_sel.si_note, 0);
390}
391
392/* ARGSUSED */
393static int
394pipe_read(fp, uio, cred, flags, p)
395	struct file *fp;
396	struct uio *uio;
397	struct ucred *cred;
398	struct proc *p;
399	int flags;
400{
401	struct pipe *rpipe = (struct pipe *) fp->f_data;
402	int error;
403	int nread = 0;
404	u_int size;
405
406	++rpipe->pipe_busy;
407	error = pipelock(rpipe, 1);
408	if (error)
409		goto unlocked_error;
410
411	while (uio->uio_resid) {
412		/*
413		 * normal pipe buffer receive
414		 */
415		if (rpipe->pipe_buffer.cnt > 0) {
416			size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
417			if (size > rpipe->pipe_buffer.cnt)
418				size = rpipe->pipe_buffer.cnt;
419			if (size > (u_int) uio->uio_resid)
420				size = (u_int) uio->uio_resid;
421
422			error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
423					size, uio);
424			if (error)
425				break;
426
427			rpipe->pipe_buffer.out += size;
428			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
429				rpipe->pipe_buffer.out = 0;
430
431			rpipe->pipe_buffer.cnt -= size;
432
433			/*
434			 * If there is no more to read in the pipe, reset
435			 * its pointers to the beginning.  This improves
436			 * cache hit stats.
437			 */
438			if (rpipe->pipe_buffer.cnt == 0) {
439				rpipe->pipe_buffer.in = 0;
440				rpipe->pipe_buffer.out = 0;
441			}
442			nread += size;
443#ifndef PIPE_NODIRECT
444		/*
445		 * Direct copy, bypassing a kernel buffer.
446		 */
447		} else if ((size = rpipe->pipe_map.cnt) &&
448			   (rpipe->pipe_state & PIPE_DIRECTW)) {
449			caddr_t	va;
450			if (size > (u_int) uio->uio_resid)
451				size = (u_int) uio->uio_resid;
452
453			va = (caddr_t) rpipe->pipe_map.kva +
454			    rpipe->pipe_map.pos;
455			error = uiomove(va, size, uio);
456			if (error)
457				break;
458			nread += size;
459			rpipe->pipe_map.pos += size;
460			rpipe->pipe_map.cnt -= size;
461			if (rpipe->pipe_map.cnt == 0) {
462				rpipe->pipe_state &= ~PIPE_DIRECTW;
463				wakeup(rpipe);
464			}
465#endif
466		} else {
467			/*
468			 * detect EOF condition
469			 * read returns 0 on EOF, no need to set error
470			 */
471			if (rpipe->pipe_state & PIPE_EOF)
472				break;
473
474			/*
475			 * If the "write-side" has been blocked, wake it up now.
476			 */
477			if (rpipe->pipe_state & PIPE_WANTW) {
478				rpipe->pipe_state &= ~PIPE_WANTW;
479				wakeup(rpipe);
480			}
481
482			/*
483			 * Break if some data was read.
484			 */
485			if (nread > 0)
486				break;
487
488			/*
489			 * Unlock the pipe buffer for our remaining processing.  We
490			 * will either break out with an error or we will sleep and
491			 * relock to loop.
492			 */
493			pipeunlock(rpipe);
494
495			/*
496			 * Handle non-blocking mode operation or
497			 * wait for more data.
498			 */
499			if (fp->f_flag & FNONBLOCK) {
500				error = EAGAIN;
501			} else {
502				rpipe->pipe_state |= PIPE_WANTR;
503				if ((error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) == 0)
504					error = pipelock(rpipe, 1);
505			}
506			if (error)
507				goto unlocked_error;
508		}
509	}
510	pipeunlock(rpipe);
511
512	if (error == 0)
513		vfs_timestamp(&rpipe->pipe_atime);
514unlocked_error:
515	--rpipe->pipe_busy;
516
517	/*
518	 * PIPE_WANT processing only makes sense if pipe_busy is 0.
519	 */
520	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
521		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
522		wakeup(rpipe);
523	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
524		/*
525		 * Handle write blocking hysteresis.
526		 */
527		if (rpipe->pipe_state & PIPE_WANTW) {
528			rpipe->pipe_state &= ~PIPE_WANTW;
529			wakeup(rpipe);
530		}
531	}
532
533	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
534		pipeselwakeup(rpipe);
535
536	return (error);
537}
538
539#ifndef PIPE_NODIRECT
540/*
541 * Map the sending processes' buffer into kernel space and wire it.
542 * This is similar to a physical write operation.
543 */
544static int
545pipe_build_write_buffer(wpipe, uio)
546	struct pipe *wpipe;
547	struct uio *uio;
548{
549	u_int size;
550	int i;
551	vm_offset_t addr, endaddr, paddr;
552
553	size = (u_int) uio->uio_iov->iov_len;
554	if (size > wpipe->pipe_buffer.size)
555		size = wpipe->pipe_buffer.size;
556
557	endaddr = round_page((vm_offset_t)uio->uio_iov->iov_base + size);
558	mtx_lock(&vm_mtx);
559	addr = trunc_page((vm_offset_t)uio->uio_iov->iov_base);
560	for (i = 0; addr < endaddr; addr += PAGE_SIZE, i++) {
561		vm_page_t m;
562
563		if (vm_fault_quick((caddr_t)addr, VM_PROT_READ) < 0 ||
564		    (paddr = pmap_kextract(addr)) == 0) {
565			int j;
566
567			for (j = 0; j < i; j++)
568				vm_page_unwire(wpipe->pipe_map.ms[j], 1);
569			mtx_unlock(&vm_mtx);
570			return (EFAULT);
571		}
572
573		m = PHYS_TO_VM_PAGE(paddr);
574		vm_page_wire(m);
575		wpipe->pipe_map.ms[i] = m;
576	}
577
578/*
579 * set up the control block
580 */
581	wpipe->pipe_map.npages = i;
582	wpipe->pipe_map.pos =
583	    ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
584	wpipe->pipe_map.cnt = size;
585
586/*
587 * and map the buffer
588 */
589	if (wpipe->pipe_map.kva == 0) {
590		/*
591		 * We need to allocate space for an extra page because the
592		 * address range might (will) span pages at times.
593		 */
594		wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
595			wpipe->pipe_buffer.size + PAGE_SIZE);
596		amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE;
597	}
598	pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
599		wpipe->pipe_map.npages);
600
601	mtx_unlock(&vm_mtx);
602/*
603 * and update the uio data
604 */
605
606	uio->uio_iov->iov_len -= size;
607	uio->uio_iov->iov_base += size;
608	if (uio->uio_iov->iov_len == 0)
609		uio->uio_iov++;
610	uio->uio_resid -= size;
611	uio->uio_offset += size;
612	return (0);
613}
614
615/*
616 * unmap and unwire the process buffer
617 */
618static void
619pipe_destroy_write_buffer(wpipe)
620	struct pipe *wpipe;
621{
622	int i;
623
624	mtx_lock(&vm_mtx);
625	if (wpipe->pipe_map.kva) {
626		pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
627
628		if (amountpipekva > MAXPIPEKVA) {
629			vm_offset_t kva = wpipe->pipe_map.kva;
630			wpipe->pipe_map.kva = 0;
631			kmem_free(kernel_map, kva,
632				wpipe->pipe_buffer.size + PAGE_SIZE);
633			amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
634		}
635	}
636	for (i = 0; i < wpipe->pipe_map.npages; i++)
637		vm_page_unwire(wpipe->pipe_map.ms[i], 1);
638	mtx_unlock(&vm_mtx);
639}
640
641/*
642 * In the case of a signal, the writing process might go away.  This
643 * code copies the data into the circular buffer so that the source
644 * pages can be freed without loss of data.
645 */
646static void
647pipe_clone_write_buffer(wpipe)
648	struct pipe *wpipe;
649{
650	int size;
651	int pos;
652
653	size = wpipe->pipe_map.cnt;
654	pos = wpipe->pipe_map.pos;
655	bcopy((caddr_t) wpipe->pipe_map.kva + pos,
656	    (caddr_t) wpipe->pipe_buffer.buffer, size);
657
658	wpipe->pipe_buffer.in = size;
659	wpipe->pipe_buffer.out = 0;
660	wpipe->pipe_buffer.cnt = size;
661	wpipe->pipe_state &= ~PIPE_DIRECTW;
662
663	pipe_destroy_write_buffer(wpipe);
664}
665
666/*
667 * This implements the pipe buffer write mechanism.  Note that only
668 * a direct write OR a normal pipe write can be pending at any given time.
669 * If there are any characters in the pipe buffer, the direct write will
670 * be deferred until the receiving process grabs all of the bytes from
671 * the pipe buffer.  Then the direct mapping write is set-up.
672 */
673static int
674pipe_direct_write(wpipe, uio)
675	struct pipe *wpipe;
676	struct uio *uio;
677{
678	int error;
679
680retry:
681	while (wpipe->pipe_state & PIPE_DIRECTW) {
682		if (wpipe->pipe_state & PIPE_WANTR) {
683			wpipe->pipe_state &= ~PIPE_WANTR;
684			wakeup(wpipe);
685		}
686		wpipe->pipe_state |= PIPE_WANTW;
687		error = tsleep(wpipe, PRIBIO | PCATCH, "pipdww", 0);
688		if (error)
689			goto error1;
690		if (wpipe->pipe_state & PIPE_EOF) {
691			error = EPIPE;
692			goto error1;
693		}
694	}
695	wpipe->pipe_map.cnt = 0;	/* transfer not ready yet */
696	if (wpipe->pipe_buffer.cnt > 0) {
697		if (wpipe->pipe_state & PIPE_WANTR) {
698			wpipe->pipe_state &= ~PIPE_WANTR;
699			wakeup(wpipe);
700		}
701
702		wpipe->pipe_state |= PIPE_WANTW;
703		error = tsleep(wpipe, PRIBIO | PCATCH, "pipdwc", 0);
704		if (error)
705			goto error1;
706		if (wpipe->pipe_state & PIPE_EOF) {
707			error = EPIPE;
708			goto error1;
709		}
710		goto retry;
711	}
712
713	wpipe->pipe_state |= PIPE_DIRECTW;
714
715	error = pipe_build_write_buffer(wpipe, uio);
716	if (error) {
717		wpipe->pipe_state &= ~PIPE_DIRECTW;
718		goto error1;
719	}
720
721	error = 0;
722	while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
723		if (wpipe->pipe_state & PIPE_EOF) {
724			pipelock(wpipe, 0);
725			pipe_destroy_write_buffer(wpipe);
726			pipeunlock(wpipe);
727			pipeselwakeup(wpipe);
728			error = EPIPE;
729			goto error1;
730		}
731		if (wpipe->pipe_state & PIPE_WANTR) {
732			wpipe->pipe_state &= ~PIPE_WANTR;
733			wakeup(wpipe);
734		}
735		pipeselwakeup(wpipe);
736		error = tsleep(wpipe, PRIBIO | PCATCH, "pipdwt", 0);
737	}
738
739	pipelock(wpipe,0);
740	if (wpipe->pipe_state & PIPE_DIRECTW) {
741		/*
742		 * this bit of trickery substitutes a kernel buffer for
743		 * the process that might be going away.
744		 */
745		pipe_clone_write_buffer(wpipe);
746	} else {
747		pipe_destroy_write_buffer(wpipe);
748	}
749	pipeunlock(wpipe);
750	return (error);
751
752error1:
753	wakeup(wpipe);
754	return (error);
755}
756#endif
757
758static int
759pipe_write(fp, uio, cred, flags, p)
760	struct file *fp;
761	struct uio *uio;
762	struct ucred *cred;
763	struct proc *p;
764	int flags;
765{
766	int error = 0;
767	int orig_resid;
768	struct pipe *wpipe, *rpipe;
769
770	rpipe = (struct pipe *) fp->f_data;
771	wpipe = rpipe->pipe_peer;
772
773	/*
774	 * detect loss of pipe read side, issue SIGPIPE if lost.
775	 */
776	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
777		return (EPIPE);
778	}
779
780	/*
781	 * If it is advantageous to resize the pipe buffer, do
782	 * so.
783	 */
784	if ((uio->uio_resid > PIPE_SIZE) &&
785		(nbigpipe < LIMITBIGPIPES) &&
786		(wpipe->pipe_state & PIPE_DIRECTW) == 0 &&
787		(wpipe->pipe_buffer.size <= PIPE_SIZE) &&
788		(wpipe->pipe_buffer.cnt == 0)) {
789
790		if ((error = pipelock(wpipe,1)) == 0) {
791			if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
792				nbigpipe++;
793			pipeunlock(wpipe);
794		} else {
795			return (error);
796		}
797	}
798
799	KASSERT(wpipe->pipe_buffer.buffer != NULL, ("pipe buffer gone"));
800
801	++wpipe->pipe_busy;
802	orig_resid = uio->uio_resid;
803	while (uio->uio_resid) {
804		int space;
805
806#ifndef PIPE_NODIRECT
807		/*
808		 * If the transfer is large, we can gain performance if
809		 * we do process-to-process copies directly.
810		 * If the write is non-blocking, we don't use the
811		 * direct write mechanism.
812		 *
813		 * The direct write mechanism will detect the reader going
814		 * away on us.
815		 */
816		if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) &&
817		    (fp->f_flag & FNONBLOCK) == 0 &&
818			(wpipe->pipe_map.kva || (amountpipekva < LIMITPIPEKVA)) &&
819			(uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
820			error = pipe_direct_write( wpipe, uio);
821			if (error)
822				break;
823			continue;
824		}
825#endif
826
827		/*
828		 * Pipe buffered writes cannot be coincidental with
829		 * direct writes.  We wait until the currently executing
830		 * direct write is completed before we start filling the
831		 * pipe buffer.  We break out if a signal occurs or the
832		 * reader goes away.
833		 */
834	retrywrite:
835		while (wpipe->pipe_state & PIPE_DIRECTW) {
836			if (wpipe->pipe_state & PIPE_WANTR) {
837				wpipe->pipe_state &= ~PIPE_WANTR;
838				wakeup(wpipe);
839			}
840			error = tsleep(wpipe, PRIBIO | PCATCH, "pipbww", 0);
841			if (wpipe->pipe_state & PIPE_EOF)
842				break;
843			if (error)
844				break;
845		}
846		if (wpipe->pipe_state & PIPE_EOF) {
847			error = EPIPE;
848			break;
849		}
850
851		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
852
853		/* Writes of size <= PIPE_BUF must be atomic. */
854		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
855			space = 0;
856
857		if (space > 0 && (wpipe->pipe_buffer.cnt < PIPE_SIZE)) {
858			if ((error = pipelock(wpipe,1)) == 0) {
859				int size;	/* Transfer size */
860				int segsize;	/* first segment to transfer */
861
862				/*
863				 * It is possible for a direct write to
864				 * slip in on us... handle it here...
865				 */
866				if (wpipe->pipe_state & PIPE_DIRECTW) {
867					pipeunlock(wpipe);
868					goto retrywrite;
869				}
870				/*
871				 * If a process blocked in uiomove, our
872				 * value for space might be bad.
873				 *
874				 * XXX will we be ok if the reader has gone
875				 * away here?
876				 */
877				if (space > wpipe->pipe_buffer.size -
878				    wpipe->pipe_buffer.cnt) {
879					pipeunlock(wpipe);
880					goto retrywrite;
881				}
882
883				/*
884				 * Transfer size is minimum of uio transfer
885				 * and free space in pipe buffer.
886				 */
887				if (space > uio->uio_resid)
888					size = uio->uio_resid;
889				else
890					size = space;
891				/*
892				 * First segment to transfer is minimum of
893				 * transfer size and contiguous space in
894				 * pipe buffer.  If first segment to transfer
895				 * is less than the transfer size, we've got
896				 * a wraparound in the buffer.
897				 */
898				segsize = wpipe->pipe_buffer.size -
899					wpipe->pipe_buffer.in;
900				if (segsize > size)
901					segsize = size;
902
903				/* Transfer first segment */
904
905				error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
906						segsize, uio);
907
908				if (error == 0 && segsize < size) {
909					/*
910					 * Transfer remaining part now, to
911					 * support atomic writes.  Wraparound
912					 * happened.
913					 */
914					if (wpipe->pipe_buffer.in + segsize !=
915					    wpipe->pipe_buffer.size)
916						panic("Expected pipe buffer wraparound disappeared");
917
918					error = uiomove(&wpipe->pipe_buffer.buffer[0],
919							size - segsize, uio);
920				}
921				if (error == 0) {
922					wpipe->pipe_buffer.in += size;
923					if (wpipe->pipe_buffer.in >=
924					    wpipe->pipe_buffer.size) {
925						if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size)
926							panic("Expected wraparound bad");
927						wpipe->pipe_buffer.in = size - segsize;
928					}
929
930					wpipe->pipe_buffer.cnt += size;
931					if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size)
932						panic("Pipe buffer overflow");
933
934				}
935				pipeunlock(wpipe);
936			}
937			if (error)
938				break;
939
940		} else {
941			/*
942			 * If the "read-side" has been blocked, wake it up now.
943			 */
944			if (wpipe->pipe_state & PIPE_WANTR) {
945				wpipe->pipe_state &= ~PIPE_WANTR;
946				wakeup(wpipe);
947			}
948
949			/*
950			 * don't block on non-blocking I/O
951			 */
952			if (fp->f_flag & FNONBLOCK) {
953				error = EAGAIN;
954				break;
955			}
956
957			/*
958			 * We have no more space and have something to offer,
959			 * wake up select/poll.
960			 */
961			pipeselwakeup(wpipe);
962
963			wpipe->pipe_state |= PIPE_WANTW;
964			error = tsleep(wpipe, PRIBIO | PCATCH, "pipewr", 0);
965			if (error != 0)
966				break;
967			/*
968			 * If read side wants to go away, we just issue a signal
969			 * to ourselves.
970			 */
971			if (wpipe->pipe_state & PIPE_EOF) {
972				error = EPIPE;
973				break;
974			}
975		}
976	}
977
978	--wpipe->pipe_busy;
979	if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) {
980		wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
981		wakeup(wpipe);
982	} else if (wpipe->pipe_buffer.cnt > 0) {
983		/*
984		 * If we have put any characters in the buffer, we wake up
985		 * the reader.
986		 */
987		if (wpipe->pipe_state & PIPE_WANTR) {
988			wpipe->pipe_state &= ~PIPE_WANTR;
989			wakeup(wpipe);
990		}
991	}
992
993	/*
994	 * Don't return EPIPE if I/O was successful
995	 */
996	if ((wpipe->pipe_buffer.cnt == 0) &&
997		(uio->uio_resid == 0) &&
998		(error == EPIPE))
999		error = 0;
1000
1001	if (error == 0)
1002		vfs_timestamp(&wpipe->pipe_mtime);
1003
1004	/*
1005	 * We have something to offer,
1006	 * wake up select/poll.
1007	 */
1008	if (wpipe->pipe_buffer.cnt)
1009		pipeselwakeup(wpipe);
1010
1011	return (error);
1012}
1013
1014/*
1015 * we implement a very minimal set of ioctls for compatibility with sockets.
1016 */
1017int
1018pipe_ioctl(fp, cmd, data, p)
1019	struct file *fp;
1020	u_long cmd;
1021	caddr_t data;
1022	struct proc *p;
1023{
1024	struct pipe *mpipe = (struct pipe *)fp->f_data;
1025
1026	switch (cmd) {
1027
1028	case FIONBIO:
1029		return (0);
1030
1031	case FIOASYNC:
1032		if (*(int *)data) {
1033			mpipe->pipe_state |= PIPE_ASYNC;
1034		} else {
1035			mpipe->pipe_state &= ~PIPE_ASYNC;
1036		}
1037		return (0);
1038
1039	case FIONREAD:
1040		if (mpipe->pipe_state & PIPE_DIRECTW)
1041			*(int *)data = mpipe->pipe_map.cnt;
1042		else
1043			*(int *)data = mpipe->pipe_buffer.cnt;
1044		return (0);
1045
1046	case FIOSETOWN:
1047		return (fsetown(*(int *)data, &mpipe->pipe_sigio));
1048
1049	case FIOGETOWN:
1050		*(int *)data = fgetown(mpipe->pipe_sigio);
1051		return (0);
1052
1053	/* This is deprecated, FIOSETOWN should be used instead. */
1054	case TIOCSPGRP:
1055		return (fsetown(-(*(int *)data), &mpipe->pipe_sigio));
1056
1057	/* This is deprecated, FIOGETOWN should be used instead. */
1058	case TIOCGPGRP:
1059		*(int *)data = -fgetown(mpipe->pipe_sigio);
1060		return (0);
1061
1062	}
1063	return (ENOTTY);
1064}
1065
1066int
1067pipe_poll(fp, events, cred, p)
1068	struct file *fp;
1069	int events;
1070	struct ucred *cred;
1071	struct proc *p;
1072{
1073	struct pipe *rpipe = (struct pipe *)fp->f_data;
1074	struct pipe *wpipe;
1075	int revents = 0;
1076
1077	wpipe = rpipe->pipe_peer;
1078	if (events & (POLLIN | POLLRDNORM))
1079		if ((rpipe->pipe_state & PIPE_DIRECTW) ||
1080		    (rpipe->pipe_buffer.cnt > 0) ||
1081		    (rpipe->pipe_state & PIPE_EOF))
1082			revents |= events & (POLLIN | POLLRDNORM);
1083
1084	if (events & (POLLOUT | POLLWRNORM))
1085		if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF) ||
1086		    (((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
1087		     (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF))
1088			revents |= events & (POLLOUT | POLLWRNORM);
1089
1090	if ((rpipe->pipe_state & PIPE_EOF) ||
1091	    (wpipe == NULL) ||
1092	    (wpipe->pipe_state & PIPE_EOF))
1093		revents |= POLLHUP;
1094
1095	if (revents == 0) {
1096		if (events & (POLLIN | POLLRDNORM)) {
1097			selrecord(p, &rpipe->pipe_sel);
1098			rpipe->pipe_state |= PIPE_SEL;
1099		}
1100
1101		if (events & (POLLOUT | POLLWRNORM)) {
1102			selrecord(p, &wpipe->pipe_sel);
1103			wpipe->pipe_state |= PIPE_SEL;
1104		}
1105	}
1106
1107	return (revents);
1108}
1109
1110static int
1111pipe_stat(fp, ub, p)
1112	struct file *fp;
1113	struct stat *ub;
1114	struct proc *p;
1115{
1116	struct pipe *pipe = (struct pipe *)fp->f_data;
1117
1118	bzero((caddr_t)ub, sizeof(*ub));
1119	ub->st_mode = S_IFIFO;
1120	ub->st_blksize = pipe->pipe_buffer.size;
1121	ub->st_size = pipe->pipe_buffer.cnt;
1122	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1123	ub->st_atimespec = pipe->pipe_atime;
1124	ub->st_mtimespec = pipe->pipe_mtime;
1125	ub->st_ctimespec = pipe->pipe_ctime;
1126	ub->st_uid = fp->f_cred->cr_uid;
1127	ub->st_gid = fp->f_cred->cr_gid;
1128	/*
1129	 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen.
1130	 * XXX (st_dev, st_ino) should be unique.
1131	 */
1132	return (0);
1133}
1134
1135/* ARGSUSED */
1136static int
1137pipe_close(fp, p)
1138	struct file *fp;
1139	struct proc *p;
1140{
1141	struct pipe *cpipe = (struct pipe *)fp->f_data;
1142
1143	fp->f_ops = &badfileops;
1144	fp->f_data = NULL;
1145	funsetown(cpipe->pipe_sigio);
1146	pipeclose(cpipe);
1147	return (0);
1148}
1149
1150static void
1151pipe_free_kmem(cpipe)
1152	struct pipe *cpipe;
1153{
1154
1155	mtx_assert(&vm_mtx, MA_OWNED);
1156	if (cpipe->pipe_buffer.buffer != NULL) {
1157		if (cpipe->pipe_buffer.size > PIPE_SIZE)
1158			--nbigpipe;
1159		amountpipekva -= cpipe->pipe_buffer.size;
1160		kmem_free(kernel_map,
1161			(vm_offset_t)cpipe->pipe_buffer.buffer,
1162			cpipe->pipe_buffer.size);
1163		cpipe->pipe_buffer.buffer = NULL;
1164	}
1165#ifndef PIPE_NODIRECT
1166	if (cpipe->pipe_map.kva != NULL) {
1167		amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE;
1168		kmem_free(kernel_map,
1169			cpipe->pipe_map.kva,
1170			cpipe->pipe_buffer.size + PAGE_SIZE);
1171		cpipe->pipe_map.cnt = 0;
1172		cpipe->pipe_map.kva = 0;
1173		cpipe->pipe_map.pos = 0;
1174		cpipe->pipe_map.npages = 0;
1175	}
1176#endif
1177}
1178
1179/*
1180 * shutdown the pipe
1181 */
1182static void
1183pipeclose(cpipe)
1184	struct pipe *cpipe;
1185{
1186	struct pipe *ppipe;
1187
1188	if (cpipe) {
1189
1190		pipeselwakeup(cpipe);
1191
1192		/*
1193		 * If the other side is blocked, wake it up saying that
1194		 * we want to close it down.
1195		 */
1196		while (cpipe->pipe_busy) {
1197			wakeup(cpipe);
1198			cpipe->pipe_state |= PIPE_WANT | PIPE_EOF;
1199			tsleep(cpipe, PRIBIO, "pipecl", 0);
1200		}
1201
1202		/*
1203		 * Disconnect from peer
1204		 */
1205		if ((ppipe = cpipe->pipe_peer) != NULL) {
1206			pipeselwakeup(ppipe);
1207
1208			ppipe->pipe_state |= PIPE_EOF;
1209			wakeup(ppipe);
1210			ppipe->pipe_peer = NULL;
1211		}
1212		/*
1213		 * free resources
1214		 */
1215		mtx_lock(&vm_mtx);
1216		pipe_free_kmem(cpipe);
1217		/* XXX: erm, doesn't zalloc already have its own locks and
1218		 * not need the giant vm lock?
1219		 */
1220		zfree(pipe_zone, cpipe);
1221		mtx_unlock(&vm_mtx);
1222	}
1223}
1224
1225/*ARGSUSED*/
1226static int
1227pipe_kqfilter(struct file *fp, struct knote *kn)
1228{
1229	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1230
1231	switch (kn->kn_filter) {
1232	case EVFILT_READ:
1233		kn->kn_fop = &pipe_rfiltops;
1234		break;
1235	case EVFILT_WRITE:
1236		kn->kn_fop = &pipe_wfiltops;
1237		break;
1238	default:
1239		return (1);
1240	}
1241
1242	SLIST_INSERT_HEAD(&rpipe->pipe_sel.si_note, kn, kn_selnext);
1243	return (0);
1244}
1245
1246static void
1247filt_pipedetach(struct knote *kn)
1248{
1249	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1250
1251	SLIST_REMOVE(&rpipe->pipe_sel.si_note, kn, knote, kn_selnext);
1252}
1253
1254/*ARGSUSED*/
1255static int
1256filt_piperead(struct knote *kn, long hint)
1257{
1258	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1259	struct pipe *wpipe = rpipe->pipe_peer;
1260
1261	kn->kn_data = rpipe->pipe_buffer.cnt;
1262	if ((kn->kn_data == 0) && (rpipe->pipe_state & PIPE_DIRECTW))
1263		kn->kn_data = rpipe->pipe_map.cnt;
1264
1265	if ((rpipe->pipe_state & PIPE_EOF) ||
1266	    (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1267		kn->kn_flags |= EV_EOF;
1268		return (1);
1269	}
1270	return (kn->kn_data > 0);
1271}
1272
1273/*ARGSUSED*/
1274static int
1275filt_pipewrite(struct knote *kn, long hint)
1276{
1277	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1278	struct pipe *wpipe = rpipe->pipe_peer;
1279
1280	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1281		kn->kn_data = 0;
1282		kn->kn_flags |= EV_EOF;
1283		return (1);
1284	}
1285	kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
1286	if (wpipe->pipe_state & PIPE_DIRECTW)
1287		kn->kn_data = 0;
1288
1289	return (kn->kn_data >= PIPE_BUF);
1290}
1291