sys_pipe.c revision 22975
1/*
2 * Copyright (c) 1996 John S. Dyson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice immediately at the beginning of the file, without modification,
10 *    this list of conditions, and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 * 3. Absolutely no warranty of function or purpose is made by the author
15 *    John S. Dyson.
16 * 4. Modifications may be freely made to this file if the above conditions
17 *    are met.
18 *
19 * $Id$
20 */
21
22#ifndef OLD_PIPE
23
24/*
25 * This file contains a high-performance replacement for the socket-based
26 * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
27 * all features of sockets, but does do everything that pipes normally
28 * do.
29 */
30
31/*
32 * This code has two modes of operation, a small write mode and a large
33 * write mode.  The small write mode acts like conventional pipes with
34 * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
35 * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
36 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
37 * the receiving process can copy it directly from the pages in the sending
38 * process.
39 *
40 * If the sending process receives a signal, it is possible that it will
41 * go away, and certainly its address space can change, because control
42 * is returned back to the user-mode side.  In that case, the pipe code
43 * arranges to copy the buffer supplied by the user process, to a pageable
44 * kernel buffer, and the receiving process will grab the data from the
45 * pageable kernel buffer.  Since signals don't happen all that often,
46 * the copy operation is normally eliminated.
47 *
48 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
49 * happen for small transfers so that the system will not spend all of
50 * its time context switching.  PIPE_SIZE is constrained by the
51 * amount of kernel virtual memory.
52 */
53
54#include <sys/param.h>
55#include <sys/systm.h>
56#include <sys/proc.h>
57#include <sys/file.h>
58#include <sys/protosw.h>
59#include <sys/stat.h>
60#include <sys/filedesc.h>
61#include <sys/malloc.h>
62#include <sys/ioctl.h>
63#include <sys/stat.h>
64#include <sys/select.h>
65#include <sys/signalvar.h>
66#include <sys/errno.h>
67#include <sys/queue.h>
68#include <sys/vmmeter.h>
69#include <sys/kernel.h>
70#include <sys/sysproto.h>
71#include <sys/pipe.h>
72
73#include <vm/vm.h>
74#include <vm/vm_prot.h>
75#include <vm/vm_param.h>
76#include <sys/lock.h>
77#include <vm/vm_object.h>
78#include <vm/vm_kern.h>
79#include <vm/vm_extern.h>
80#include <vm/pmap.h>
81#include <vm/vm_map.h>
82#include <vm/vm_page.h>
83
84/*
85 * Use this define if you want to disable *fancy* VM things.  Expect an
86 * approx 30% decrease in transfer rate.  This could be useful for
87 * NetBSD or OpenBSD.
88 */
89/* #define PIPE_NODIRECT */
90
91/*
92 * interfaces to the outside world
93 */
94static int pipe_read __P((struct file *fp, struct uio *uio,
95		struct ucred *cred));
96static int pipe_write __P((struct file *fp, struct uio *uio,
97		struct ucred *cred));
98static int pipe_close __P((struct file *fp, struct proc *p));
99static int pipe_select __P((struct file *fp, int which, struct proc *p));
100static int pipe_ioctl __P((struct file *fp, int cmd, caddr_t data, struct proc *p));
101
102static struct fileops pipeops =
103    { pipe_read, pipe_write, pipe_ioctl, pipe_select, pipe_close };
104
105/*
106 * Default pipe buffer size(s), this can be kind-of large now because pipe
107 * space is pageable.  The pipe code will try to maintain locality of
108 * reference for performance reasons, so small amounts of outstanding I/O
109 * will not wipe the cache.
110 */
111#define MINPIPESIZE (PIPE_SIZE/3)
112#define MAXPIPESIZE (2*PIPE_SIZE/3)
113
114/*
115 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
116 * is there so that on large systems, we don't exhaust it.
117 */
118#define MAXPIPEKVA (8*1024*1024)
119
120/*
121 * Limit for direct transfers, we cannot, of course limit
122 * the amount of kva for pipes in general though.
123 */
124#define LIMITPIPEKVA (16*1024*1024)
125
126/*
127 * Limit the number of "big" pipes
128 */
129#define LIMITBIGPIPES	32
130int nbigpipe;
131
132static int amountpipekva;
133
134static void pipeclose __P((struct pipe *cpipe));
135static void pipeinit __P((struct pipe *cpipe));
136static __inline int pipelock __P((struct pipe *cpipe, int catch));
137static __inline void pipeunlock __P((struct pipe *cpipe));
138static __inline void pipeselwakeup __P((struct pipe *cpipe));
139#ifndef PIPE_NODIRECT
140static int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio));
141static void pipe_destroy_write_buffer __P((struct pipe *wpipe));
142static int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio));
143static void pipe_clone_write_buffer __P((struct pipe *wpipe));
144#endif
145static void pipespace __P((struct pipe *cpipe));
146
147/*
148 * The pipe system call for the DTYPE_PIPE type of pipes
149 */
150
151/* ARGSUSED */
152int
153pipe(p, uap, retval)
154	struct proc *p;
155	struct pipe_args /* {
156		int	dummy;
157	} */ *uap;
158	int retval[];
159{
160	register struct filedesc *fdp = p->p_fd;
161	struct file *rf, *wf;
162	struct pipe *rpipe, *wpipe;
163	int fd, error;
164
165	rpipe = malloc( sizeof (*rpipe), M_TEMP, M_WAITOK);
166	pipeinit(rpipe);
167	rpipe->pipe_state |= PIPE_DIRECTOK;
168	wpipe = malloc( sizeof (*wpipe), M_TEMP, M_WAITOK);
169	pipeinit(wpipe);
170	wpipe->pipe_state |= PIPE_DIRECTOK;
171
172	error = falloc(p, &rf, &fd);
173	if (error)
174		goto free2;
175	retval[0] = fd;
176	rf->f_flag = FREAD | FWRITE;
177	rf->f_type = DTYPE_PIPE;
178	rf->f_ops = &pipeops;
179	rf->f_data = (caddr_t)rpipe;
180	error = falloc(p, &wf, &fd);
181	if (error)
182		goto free3;
183	wf->f_flag = FREAD | FWRITE;
184	wf->f_type = DTYPE_PIPE;
185	wf->f_ops = &pipeops;
186	wf->f_data = (caddr_t)wpipe;
187	retval[1] = fd;
188
189	rpipe->pipe_peer = wpipe;
190	wpipe->pipe_peer = rpipe;
191
192	return (0);
193free3:
194	ffree(rf);
195	fdp->fd_ofiles[retval[0]] = 0;
196free2:
197	(void)pipeclose(wpipe);
198	(void)pipeclose(rpipe);
199	return (error);
200}
201
202/*
203 * Allocate kva for pipe circular buffer, the space is pageable
204 */
205static void
206pipespace(cpipe)
207	struct pipe *cpipe;
208{
209	int npages, error;
210
211	npages = round_page(cpipe->pipe_buffer.size)/PAGE_SIZE;
212	/*
213	 * Create an object, I don't like the idea of paging to/from
214	 * kernel_object.
215	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
216	 */
217	cpipe->pipe_buffer.object = vm_object_allocate(OBJT_DEFAULT, npages);
218	cpipe->pipe_buffer.buffer = (caddr_t) vm_map_min(kernel_map);
219
220	/*
221	 * Insert the object into the kernel map, and allocate kva for it.
222	 * The map entry is, by default, pageable.
223	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
224	 */
225	error = vm_map_find(kernel_map, cpipe->pipe_buffer.object, 0,
226		(vm_offset_t *) &cpipe->pipe_buffer.buffer,
227		cpipe->pipe_buffer.size, 1,
228		VM_PROT_ALL, VM_PROT_ALL, 0);
229
230	if (error != KERN_SUCCESS)
231		panic("pipeinit: cannot allocate pipe -- out of kvm -- code = %d", error);
232	amountpipekva += cpipe->pipe_buffer.size;
233}
234
235/*
236 * initialize and allocate VM and memory for pipe
237 */
238static void
239pipeinit(cpipe)
240	struct pipe *cpipe;
241{
242	int s;
243
244	cpipe->pipe_buffer.in = 0;
245	cpipe->pipe_buffer.out = 0;
246	cpipe->pipe_buffer.cnt = 0;
247	cpipe->pipe_buffer.size = PIPE_SIZE;
248
249	/* Buffer kva gets dynamically allocated */
250	cpipe->pipe_buffer.buffer = NULL;
251	/* cpipe->pipe_buffer.object = invalid */
252
253	cpipe->pipe_state = 0;
254	cpipe->pipe_peer = NULL;
255	cpipe->pipe_busy = 0;
256	s = splhigh();
257	cpipe->pipe_ctime = time;
258	cpipe->pipe_atime = time;
259	cpipe->pipe_mtime = time;
260	splx(s);
261	bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel);
262	cpipe->pipe_pgid = NO_PID;
263
264#ifndef PIPE_NODIRECT
265	/*
266	 * pipe data structure initializations to support direct pipe I/O
267	 */
268	cpipe->pipe_map.cnt = 0;
269	cpipe->pipe_map.kva = 0;
270	cpipe->pipe_map.pos = 0;
271	cpipe->pipe_map.npages = 0;
272	/* cpipe->pipe_map.ms[] = invalid */
273#endif
274}
275
276
277/*
278 * lock a pipe for I/O, blocking other access
279 */
280static __inline int
281pipelock(cpipe, catch)
282	struct pipe *cpipe;
283	int catch;
284{
285	int error;
286	while (cpipe->pipe_state & PIPE_LOCK) {
287		cpipe->pipe_state |= PIPE_LWANT;
288		if (error = tsleep( cpipe,
289			catch?(PRIBIO|PCATCH):PRIBIO, "pipelk", 0)) {
290			return error;
291		}
292	}
293	cpipe->pipe_state |= PIPE_LOCK;
294	return 0;
295}
296
297/*
298 * unlock a pipe I/O lock
299 */
300static __inline void
301pipeunlock(cpipe)
302	struct pipe *cpipe;
303{
304	cpipe->pipe_state &= ~PIPE_LOCK;
305	if (cpipe->pipe_state & PIPE_LWANT) {
306		cpipe->pipe_state &= ~PIPE_LWANT;
307		wakeup(cpipe);
308	}
309}
310
311static __inline void
312pipeselwakeup(cpipe)
313	struct pipe *cpipe;
314{
315	struct proc *p;
316
317	if (cpipe->pipe_state & PIPE_SEL) {
318		cpipe->pipe_state &= ~PIPE_SEL;
319		selwakeup(&cpipe->pipe_sel);
320	}
321	if (cpipe->pipe_state & PIPE_ASYNC) {
322		if (cpipe->pipe_pgid < 0)
323			gsignal(-cpipe->pipe_pgid, SIGIO);
324		else if ((p = pfind(cpipe->pipe_pgid)) != NULL)
325			psignal(p, SIGIO);
326	}
327}
328
329/* ARGSUSED */
330static int
331pipe_read(fp, uio, cred)
332	struct file *fp;
333	struct uio *uio;
334	struct ucred *cred;
335{
336
337	struct pipe *rpipe = (struct pipe *) fp->f_data;
338	int error = 0;
339	int nread = 0;
340	u_int size;
341
342	++rpipe->pipe_busy;
343	while (uio->uio_resid) {
344		/*
345		 * normal pipe buffer receive
346		 */
347		if (rpipe->pipe_buffer.cnt > 0) {
348			size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
349			if (size > rpipe->pipe_buffer.cnt)
350				size = rpipe->pipe_buffer.cnt;
351			if (size > (u_int) uio->uio_resid)
352				size = (u_int) uio->uio_resid;
353			if ((error = pipelock(rpipe,1)) == 0) {
354				error = uiomove( &rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
355					size, uio);
356				pipeunlock(rpipe);
357			}
358			if (error) {
359				break;
360			}
361			rpipe->pipe_buffer.out += size;
362			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
363				rpipe->pipe_buffer.out = 0;
364
365			rpipe->pipe_buffer.cnt -= size;
366			nread += size;
367#ifndef PIPE_NODIRECT
368		/*
369		 * Direct copy, bypassing a kernel buffer.
370		 */
371		} else if ((size = rpipe->pipe_map.cnt) &&
372			(rpipe->pipe_state & PIPE_DIRECTW)) {
373			caddr_t va;
374			if (size > (u_int) uio->uio_resid)
375				size = (u_int) uio->uio_resid;
376			if ((error = pipelock(rpipe,1)) == 0) {
377				va = (caddr_t) rpipe->pipe_map.kva + rpipe->pipe_map.pos;
378				error = uiomove(va, size, uio);
379				pipeunlock(rpipe);
380			}
381			if (error)
382				break;
383			nread += size;
384			rpipe->pipe_map.pos += size;
385			rpipe->pipe_map.cnt -= size;
386			if (rpipe->pipe_map.cnt == 0) {
387				rpipe->pipe_state &= ~PIPE_DIRECTW;
388				wakeup(rpipe);
389			}
390#endif
391		} else {
392			/*
393			 * detect EOF condition
394			 */
395			if (rpipe->pipe_state & PIPE_EOF) {
396				/* XXX error = ? */
397				break;
398			}
399			/*
400			 * If the "write-side" has been blocked, wake it up now.
401			 */
402			if (rpipe->pipe_state & PIPE_WANTW) {
403				rpipe->pipe_state &= ~PIPE_WANTW;
404				wakeup(rpipe);
405			}
406			if (nread > 0)
407				break;
408
409			if (fp->f_flag & FNONBLOCK) {
410				error = EAGAIN;
411				break;
412			}
413
414			/*
415			 * If there is no more to read in the pipe, reset
416			 * its pointers to the beginning.  This improves
417			 * cache hit stats.
418			 */
419
420			if ((error = pipelock(rpipe,1)) == 0) {
421				if (rpipe->pipe_buffer.cnt == 0) {
422					rpipe->pipe_buffer.in = 0;
423					rpipe->pipe_buffer.out = 0;
424				}
425				pipeunlock(rpipe);
426			} else {
427				break;
428			}
429
430			if (rpipe->pipe_state & PIPE_WANTW) {
431				rpipe->pipe_state &= ~PIPE_WANTW;
432				wakeup(rpipe);
433			}
434
435			rpipe->pipe_state |= PIPE_WANTR;
436			if (error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) {
437				break;
438			}
439		}
440	}
441
442	if (error == 0) {
443		int s = splhigh();
444		rpipe->pipe_atime = time;
445		splx(s);
446	}
447
448	--rpipe->pipe_busy;
449	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
450		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
451		wakeup(rpipe);
452	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
453		/*
454		 * If there is no more to read in the pipe, reset
455		 * its pointers to the beginning.  This improves
456		 * cache hit stats.
457		 */
458		if (rpipe->pipe_buffer.cnt == 0) {
459			if ((error == 0) && (error = pipelock(rpipe,1)) == 0) {
460				rpipe->pipe_buffer.in = 0;
461				rpipe->pipe_buffer.out = 0;
462				pipeunlock(rpipe);
463			}
464		}
465
466		/*
467		 * If the "write-side" has been blocked, wake it up now.
468		 */
469		if (rpipe->pipe_state & PIPE_WANTW) {
470			rpipe->pipe_state &= ~PIPE_WANTW;
471			wakeup(rpipe);
472		}
473	}
474
475	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
476		pipeselwakeup(rpipe);
477
478	return error;
479}
480
481#ifndef PIPE_NODIRECT
482/*
483 * Map the sending processes' buffer into kernel space and wire it.
484 * This is similar to a physical write operation.
485 */
486static int
487pipe_build_write_buffer(wpipe, uio)
488	struct pipe *wpipe;
489	struct uio *uio;
490{
491	u_int size;
492	int i;
493	vm_offset_t addr, endaddr, paddr;
494
495	size = (u_int) uio->uio_iov->iov_len;
496	if (size > wpipe->pipe_buffer.size)
497		size = wpipe->pipe_buffer.size;
498
499	endaddr = round_page(uio->uio_iov->iov_base + size);
500	for(i = 0, addr = trunc_page(uio->uio_iov->iov_base);
501		addr < endaddr;
502		addr += PAGE_SIZE, i+=1) {
503
504		vm_page_t m;
505
506		vm_fault_quick( (caddr_t) addr, VM_PROT_READ);
507		paddr = pmap_kextract(addr);
508		if (!paddr) {
509			int j;
510			for(j=0;j<i;j++)
511				vm_page_unwire(wpipe->pipe_map.ms[j]);
512			return EFAULT;
513		}
514
515		m = PHYS_TO_VM_PAGE(paddr);
516		vm_page_wire(m);
517		wpipe->pipe_map.ms[i] = m;
518	}
519
520/*
521 * set up the control block
522 */
523	wpipe->pipe_map.npages = i;
524	wpipe->pipe_map.pos = ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
525	wpipe->pipe_map.cnt = size;
526
527/*
528 * and map the buffer
529 */
530	if (wpipe->pipe_map.kva == 0) {
531		/*
532		 * We need to allocate space for an extra page because the
533		 * address range might (will) span pages at times.
534		 */
535		wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
536			wpipe->pipe_buffer.size + PAGE_SIZE);
537		amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE;
538	}
539	pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
540		wpipe->pipe_map.npages);
541
542/*
543 * and update the uio data
544 */
545
546	uio->uio_iov->iov_len -= size;
547	uio->uio_iov->iov_base += size;
548	if (uio->uio_iov->iov_len == 0)
549		uio->uio_iov++;
550	uio->uio_resid -= size;
551	uio->uio_offset += size;
552	return 0;
553}
554
555/*
556 * unmap and unwire the process buffer
557 */
558static void
559pipe_destroy_write_buffer(wpipe)
560struct pipe *wpipe;
561{
562	int i;
563	if (wpipe->pipe_map.kva) {
564		pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
565
566		if (amountpipekva > MAXPIPEKVA) {
567			vm_offset_t kva = wpipe->pipe_map.kva;
568			wpipe->pipe_map.kva = 0;
569			kmem_free(kernel_map, kva,
570				wpipe->pipe_buffer.size + PAGE_SIZE);
571			amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
572		}
573	}
574	for (i=0;i<wpipe->pipe_map.npages;i++)
575		vm_page_unwire(wpipe->pipe_map.ms[i]);
576}
577
578/*
579 * In the case of a signal, the writing process might go away.  This
580 * code copies the data into the circular buffer so that the source
581 * pages can be freed without loss of data.
582 */
583static void
584pipe_clone_write_buffer(wpipe)
585struct pipe *wpipe;
586{
587	int size;
588	int pos;
589
590	size = wpipe->pipe_map.cnt;
591	pos = wpipe->pipe_map.pos;
592	bcopy((caddr_t) wpipe->pipe_map.kva+pos,
593			(caddr_t) wpipe->pipe_buffer.buffer,
594			size);
595
596	wpipe->pipe_buffer.in = size;
597	wpipe->pipe_buffer.out = 0;
598	wpipe->pipe_buffer.cnt = size;
599	wpipe->pipe_state &= ~PIPE_DIRECTW;
600
601	pipe_destroy_write_buffer(wpipe);
602}
603
604/*
605 * This implements the pipe buffer write mechanism.  Note that only
606 * a direct write OR a normal pipe write can be pending at any given time.
607 * If there are any characters in the pipe buffer, the direct write will
608 * be deferred until the receiving process grabs all of the bytes from
609 * the pipe buffer.  Then the direct mapping write is set-up.
610 */
611static int
612pipe_direct_write(wpipe, uio)
613	struct pipe *wpipe;
614	struct uio *uio;
615{
616	int error;
617retry:
618	while (wpipe->pipe_state & PIPE_DIRECTW) {
619		if ( wpipe->pipe_state & PIPE_WANTR) {
620			wpipe->pipe_state &= ~PIPE_WANTR;
621			wakeup(wpipe);
622		}
623		wpipe->pipe_state |= PIPE_WANTW;
624		error = tsleep(wpipe,
625				PRIBIO|PCATCH, "pipdww", 0);
626		if (error)
627			goto error1;
628		if (wpipe->pipe_state & PIPE_EOF) {
629			error = EPIPE;
630			goto error1;
631		}
632	}
633	wpipe->pipe_map.cnt = 0;	/* transfer not ready yet */
634	if (wpipe->pipe_buffer.cnt > 0) {
635		if ( wpipe->pipe_state & PIPE_WANTR) {
636			wpipe->pipe_state &= ~PIPE_WANTR;
637			wakeup(wpipe);
638		}
639
640		wpipe->pipe_state |= PIPE_WANTW;
641		error = tsleep(wpipe,
642				PRIBIO|PCATCH, "pipdwc", 0);
643		if (error)
644			goto error1;
645		if (wpipe->pipe_state & PIPE_EOF) {
646			error = EPIPE;
647			goto error1;
648		}
649		goto retry;
650	}
651
652	wpipe->pipe_state |= PIPE_DIRECTW;
653
654	error = pipe_build_write_buffer(wpipe, uio);
655	if (error) {
656		wpipe->pipe_state &= ~PIPE_DIRECTW;
657		goto error1;
658	}
659
660	error = 0;
661	while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
662		if (wpipe->pipe_state & PIPE_EOF) {
663			pipelock(wpipe, 0);
664			pipe_destroy_write_buffer(wpipe);
665			pipeunlock(wpipe);
666			pipeselwakeup(wpipe);
667			error = EPIPE;
668			goto error1;
669		}
670		if (wpipe->pipe_state & PIPE_WANTR) {
671			wpipe->pipe_state &= ~PIPE_WANTR;
672			wakeup(wpipe);
673		}
674		pipeselwakeup(wpipe);
675		error = tsleep(wpipe, PRIBIO|PCATCH, "pipdwt", 0);
676	}
677
678	pipelock(wpipe,0);
679	if (wpipe->pipe_state & PIPE_DIRECTW) {
680		/*
681		 * this bit of trickery substitutes a kernel buffer for
682		 * the process that might be going away.
683		 */
684		pipe_clone_write_buffer(wpipe);
685	} else {
686		pipe_destroy_write_buffer(wpipe);
687	}
688	pipeunlock(wpipe);
689	return error;
690
691error1:
692	wakeup(wpipe);
693	return error;
694}
695#endif
696
697static int
698pipe_write(fp, uio, cred)
699	struct file *fp;
700	struct uio *uio;
701	struct ucred *cred;
702{
703	int error = 0;
704	int orig_resid;
705
706	struct pipe *wpipe, *rpipe;
707
708	rpipe = (struct pipe *) fp->f_data;
709	wpipe = rpipe->pipe_peer;
710
711	/*
712	 * detect loss of pipe read side, issue SIGPIPE if lost.
713	 */
714	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
715		return EPIPE;
716	}
717
718	/*
719	 * If it is advantageous to resize the pipe buffer, do
720	 * so.
721	 */
722	if ((uio->uio_resid > PIPE_SIZE) &&
723		(nbigpipe < LIMITBIGPIPES) &&
724		(wpipe->pipe_state & PIPE_DIRECTW) == 0 &&
725		(wpipe->pipe_buffer.size <= PIPE_SIZE) &&
726		(wpipe->pipe_buffer.cnt == 0)) {
727
728		if (wpipe->pipe_buffer.buffer) {
729			amountpipekva -= wpipe->pipe_buffer.size;
730			kmem_free(kernel_map,
731				(vm_offset_t)wpipe->pipe_buffer.buffer,
732				wpipe->pipe_buffer.size);
733		}
734
735#ifndef PIPE_NODIRECT
736		if (wpipe->pipe_map.kva) {
737			amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
738			kmem_free(kernel_map,
739				wpipe->pipe_map.kva,
740				wpipe->pipe_buffer.size + PAGE_SIZE);
741		}
742#endif
743
744		wpipe->pipe_buffer.in = 0;
745		wpipe->pipe_buffer.out = 0;
746		wpipe->pipe_buffer.cnt = 0;
747		wpipe->pipe_buffer.size = BIG_PIPE_SIZE;
748		wpipe->pipe_buffer.buffer = NULL;
749		++nbigpipe;
750
751#ifndef PIPE_NODIRECT
752		wpipe->pipe_map.cnt = 0;
753		wpipe->pipe_map.kva = 0;
754		wpipe->pipe_map.pos = 0;
755		wpipe->pipe_map.npages = 0;
756#endif
757
758	}
759
760
761	if( wpipe->pipe_buffer.buffer == NULL) {
762		if ((error = pipelock(wpipe,1)) == 0) {
763			pipespace(wpipe);
764			pipeunlock(wpipe);
765		} else {
766			return error;
767		}
768	}
769
770	++wpipe->pipe_busy;
771	orig_resid = uio->uio_resid;
772	while (uio->uio_resid) {
773		int space;
774#ifndef PIPE_NODIRECT
775		/*
776		 * If the transfer is large, we can gain performance if
777		 * we do process-to-process copies directly.
778		 * If the write is non-blocking, we don't use the
779		 * direct write mechanism.
780		 */
781		if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) &&
782		    (fp->f_flag & FNONBLOCK) == 0 &&
783			(wpipe->pipe_map.kva || (amountpipekva < LIMITPIPEKVA)) &&
784			(uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
785			error = pipe_direct_write( wpipe, uio);
786			if (error) {
787				break;
788			}
789			continue;
790		}
791#endif
792
793		/*
794		 * Pipe buffered writes cannot be coincidental with
795		 * direct writes.  We wait until the currently executing
796		 * direct write is completed before we start filling the
797		 * pipe buffer.
798		 */
799	retrywrite:
800		while (wpipe->pipe_state & PIPE_DIRECTW) {
801			if (wpipe->pipe_state & PIPE_WANTR) {
802				wpipe->pipe_state &= ~PIPE_WANTR;
803				wakeup(wpipe);
804			}
805			error = tsleep(wpipe,
806					PRIBIO|PCATCH, "pipbww", 0);
807			if (error)
808				break;
809		}
810
811		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
812
813		/* Writes of size <= PIPE_BUF must be atomic. */
814		/* XXX perhaps they need to be contiguous to be atomic? */
815		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
816			space = 0;
817
818		if (space > 0 && (wpipe->pipe_buffer.cnt < PIPE_SIZE)) {
819			/*
820			 * This set the maximum transfer as a segment of
821			 * the buffer.
822			 */
823			int size = wpipe->pipe_buffer.size - wpipe->pipe_buffer.in;
824			/*
825			 * space is the size left in the buffer
826			 */
827			if (size > space)
828				size = space;
829			/*
830			 * now limit it to the size of the uio transfer
831			 */
832			if (size > uio->uio_resid)
833				size = uio->uio_resid;
834			if ((error = pipelock(wpipe,1)) == 0) {
835				/*
836				 * It is possible for a direct write to
837				 * slip in on us... handle it here...
838				 */
839				if (wpipe->pipe_state & PIPE_DIRECTW) {
840					pipeunlock(wpipe);
841					goto retrywrite;
842				}
843				error = uiomove( &wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
844					size, uio);
845				pipeunlock(wpipe);
846			}
847			if (error)
848				break;
849
850			wpipe->pipe_buffer.in += size;
851			if (wpipe->pipe_buffer.in >= wpipe->pipe_buffer.size)
852				wpipe->pipe_buffer.in = 0;
853
854			wpipe->pipe_buffer.cnt += size;
855		} else {
856			/*
857			 * If the "read-side" has been blocked, wake it up now.
858			 */
859			if (wpipe->pipe_state & PIPE_WANTR) {
860				wpipe->pipe_state &= ~PIPE_WANTR;
861				wakeup(wpipe);
862			}
863
864			/*
865			 * don't block on non-blocking I/O
866			 */
867			if (fp->f_flag & FNONBLOCK) {
868				error = EAGAIN;
869				break;
870			}
871
872			/*
873			 * We have no more space and have something to offer,
874			 * wake up selects.
875			 */
876			pipeselwakeup(wpipe);
877
878			wpipe->pipe_state |= PIPE_WANTW;
879			if (error = tsleep(wpipe, (PRIBIO+1)|PCATCH, "pipewr", 0)) {
880				break;
881			}
882			/*
883			 * If read side wants to go away, we just issue a signal
884			 * to ourselves.
885			 */
886			if (wpipe->pipe_state & PIPE_EOF) {
887				error = EPIPE;
888				break;
889			}
890		}
891	}
892
893	--wpipe->pipe_busy;
894	if ((wpipe->pipe_busy == 0) &&
895		(wpipe->pipe_state & PIPE_WANT)) {
896		wpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTR);
897		wakeup(wpipe);
898	} else if (wpipe->pipe_buffer.cnt > 0) {
899		/*
900		 * If we have put any characters in the buffer, we wake up
901		 * the reader.
902		 */
903		if (wpipe->pipe_state & PIPE_WANTR) {
904			wpipe->pipe_state &= ~PIPE_WANTR;
905			wakeup(wpipe);
906		}
907	}
908
909	/*
910	 * Don't return EPIPE if I/O was successful
911	 */
912	if ((wpipe->pipe_buffer.cnt == 0) &&
913		(uio->uio_resid == 0) &&
914		(error == EPIPE))
915		error = 0;
916
917	if (error == 0) {
918		int s = splhigh();
919		wpipe->pipe_mtime = time;
920		splx(s);
921	}
922	/*
923	 * We have something to offer,
924	 * wake up select.
925	 */
926	if (wpipe->pipe_buffer.cnt)
927		pipeselwakeup(wpipe);
928
929	return error;
930}
931
932/*
933 * we implement a very minimal set of ioctls for compatibility with sockets.
934 */
935int
936pipe_ioctl(fp, cmd, data, p)
937	struct file *fp;
938	int cmd;
939	register caddr_t data;
940	struct proc *p;
941{
942	register struct pipe *mpipe = (struct pipe *)fp->f_data;
943
944	switch (cmd) {
945
946	case FIONBIO:
947		return (0);
948
949	case FIOASYNC:
950		if (*(int *)data) {
951			mpipe->pipe_state |= PIPE_ASYNC;
952		} else {
953			mpipe->pipe_state &= ~PIPE_ASYNC;
954		}
955		return (0);
956
957	case FIONREAD:
958		if (mpipe->pipe_state & PIPE_DIRECTW)
959			*(int *)data = mpipe->pipe_map.cnt;
960		else
961			*(int *)data = mpipe->pipe_buffer.cnt;
962		return (0);
963
964	case TIOCSPGRP:
965		mpipe->pipe_pgid = *(int *)data;
966		return (0);
967
968	case TIOCGPGRP:
969		*(int *)data = mpipe->pipe_pgid;
970		return (0);
971
972	}
973	return (ENOTTY);
974}
975
976int
977pipe_select(fp, which, p)
978	struct file *fp;
979	int which;
980	struct proc *p;
981{
982	register struct pipe *rpipe = (struct pipe *)fp->f_data;
983	struct pipe *wpipe;
984
985	wpipe = rpipe->pipe_peer;
986	switch (which) {
987
988	case FREAD:
989		if ( (rpipe->pipe_state & PIPE_DIRECTW) ||
990			(rpipe->pipe_buffer.cnt > 0) ||
991			(rpipe->pipe_state & PIPE_EOF)) {
992			return (1);
993		}
994		selrecord(p, &rpipe->pipe_sel);
995		rpipe->pipe_state |= PIPE_SEL;
996		break;
997
998	case FWRITE:
999		if ((wpipe == NULL) ||
1000			(wpipe->pipe_state & PIPE_EOF) ||
1001			(((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
1002			 (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) {
1003			return (1);
1004		}
1005		selrecord(p, &wpipe->pipe_sel);
1006		wpipe->pipe_state |= PIPE_SEL;
1007		break;
1008
1009	case 0:
1010		if ((rpipe->pipe_state & PIPE_EOF) ||
1011			(wpipe == NULL) ||
1012			(wpipe->pipe_state & PIPE_EOF)) {
1013			return (1);
1014		}
1015
1016		selrecord(p, &rpipe->pipe_sel);
1017		rpipe->pipe_state |= PIPE_SEL;
1018		break;
1019	}
1020	return (0);
1021}
1022
1023int
1024pipe_stat(pipe, ub)
1025	register struct pipe *pipe;
1026	register struct stat *ub;
1027{
1028	bzero((caddr_t)ub, sizeof (*ub));
1029	ub->st_mode = S_IFIFO;
1030	ub->st_blksize = pipe->pipe_buffer.size;
1031	ub->st_size = pipe->pipe_buffer.cnt;
1032	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1033	TIMEVAL_TO_TIMESPEC(&pipe->pipe_atime, &ub->st_atimespec);
1034	TIMEVAL_TO_TIMESPEC(&pipe->pipe_mtime, &ub->st_mtimespec);
1035	TIMEVAL_TO_TIMESPEC(&pipe->pipe_ctime, &ub->st_ctimespec);
1036	/*
1037	 * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev,
1038	 * st_flags, st_gen.
1039	 * XXX (st_dev, st_ino) should be unique.
1040	 */
1041	return 0;
1042}
1043
1044/* ARGSUSED */
1045static int
1046pipe_close(fp, p)
1047	struct file *fp;
1048	struct proc *p;
1049{
1050	struct pipe *cpipe = (struct pipe *)fp->f_data;
1051
1052	pipeclose(cpipe);
1053	fp->f_data = NULL;
1054	return 0;
1055}
1056
1057/*
1058 * shutdown the pipe
1059 */
1060static void
1061pipeclose(cpipe)
1062	struct pipe *cpipe;
1063{
1064	struct pipe *ppipe;
1065	if (cpipe) {
1066
1067		pipeselwakeup(cpipe);
1068
1069		/*
1070		 * If the other side is blocked, wake it up saying that
1071		 * we want to close it down.
1072		 */
1073		while (cpipe->pipe_busy) {
1074			wakeup(cpipe);
1075			cpipe->pipe_state |= PIPE_WANT|PIPE_EOF;
1076			tsleep(cpipe, PRIBIO, "pipecl", 0);
1077		}
1078
1079		/*
1080		 * Disconnect from peer
1081		 */
1082		if (ppipe = cpipe->pipe_peer) {
1083			pipeselwakeup(ppipe);
1084
1085			ppipe->pipe_state |= PIPE_EOF;
1086			wakeup(ppipe);
1087			ppipe->pipe_peer = NULL;
1088		}
1089
1090		/*
1091		 * free resources
1092		 */
1093		if (cpipe->pipe_buffer.buffer) {
1094			if (cpipe->pipe_buffer.size > PIPE_SIZE)
1095				--nbigpipe;
1096			amountpipekva -= cpipe->pipe_buffer.size;
1097			kmem_free(kernel_map,
1098				(vm_offset_t)cpipe->pipe_buffer.buffer,
1099				cpipe->pipe_buffer.size);
1100		}
1101#ifndef PIPE_NODIRECT
1102		if (cpipe->pipe_map.kva) {
1103			amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE;
1104			kmem_free(kernel_map,
1105				cpipe->pipe_map.kva,
1106				cpipe->pipe_buffer.size + PAGE_SIZE);
1107		}
1108#endif
1109		free(cpipe, M_TEMP);
1110	}
1111}
1112#endif
1113