sys_pipe.c revision 14037
1/*
2 * Copyright (c) 1996 John S. Dyson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice immediately at the beginning of the file, without modification,
10 *    this list of conditions, and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 * 3. Absolutely no warranty of function or purpose is made by the author
15 *    John S. Dyson.
16 * 4. Modifications may be freely made to this file if the above conditions
17 *    are met.
18 *
19 * $Id: sys_pipe.c,v 1.10 1996/02/09 04:36:36 dyson Exp $
20 */
21
22#ifndef OLD_PIPE
23
24/*
25 * This file contains a high-performance replacement for the socket-based
26 * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
27 * all features of sockets, but does do everything that pipes normally
28 * do.
29 */
30
31/*
32 * This code has two modes of operation, a small write mode and a large
33 * write mode.  The small write mode acts like conventional pipes with
34 * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
35 * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
36 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
37 * the receiving process can copy it directly from the pages in the sending
38 * process.
39 *
40 * If the sending process receives a signal, it is possible that it will
41 * go away, and certainly its address space can change, because control
42 * is returned back to the user-mode side.  In that case, the pipe code
43 * arranges to copy the buffer supplied by the user process, to a pageable
44 * kernel buffer, and the receiving process will grab the data from the
45 * pageable kernel buffer.  Since signals don't happen all that often,
46 * the copy operation is normally eliminated.
47 *
48 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
49 * happen for small transfers so that the system will not spend all of
50 * its time context switching.  PIPE_SIZE is constrained by the
51 * amount of kernel virtual memory.
52 */
53
54#include <sys/param.h>
55#include <sys/systm.h>
56#include <sys/proc.h>
57#include <sys/file.h>
58#include <sys/protosw.h>
59#include <sys/stat.h>
60#include <sys/filedesc.h>
61#include <sys/malloc.h>
62#include <sys/ioctl.h>
63#include <sys/stat.h>
64#include <sys/select.h>
65#include <sys/signalvar.h>
66#include <sys/errno.h>
67#include <sys/queue.h>
68#include <sys/vmmeter.h>
69#include <sys/kernel.h>
70#include <sys/sysproto.h>
71#include <sys/pipe.h>
72
73#include <vm/vm.h>
74#include <vm/vm_prot.h>
75#include <vm/vm_param.h>
76#include <vm/lock.h>
77#include <vm/vm_object.h>
78#include <vm/vm_kern.h>
79#include <vm/vm_extern.h>
80#include <vm/pmap.h>
81#include <vm/vm_map.h>
82#include <vm/vm_page.h>
83
84/*
85 * Use this define if you want to disable *fancy* VM things.  Expect an
86 * approx 30% decrease in transfer rate.  This could be useful for
87 * NetBSD or OpenBSD.
88 */
89/* #define PIPE_NODIRECT */
90
91/*
92 * interfaces to the outside world
93 */
94static int pipe_read __P((struct file *fp, struct uio *uio,
95		struct ucred *cred));
96static int pipe_write __P((struct file *fp, struct uio *uio,
97		struct ucred *cred));
98static int pipe_close __P((struct file *fp, struct proc *p));
99static int pipe_select __P((struct file *fp, int which, struct proc *p));
100static int pipe_ioctl __P((struct file *fp, int cmd, caddr_t data, struct proc *p));
101
102static struct fileops pipeops =
103    { pipe_read, pipe_write, pipe_ioctl, pipe_select, pipe_close };
104
105/*
106 * Default pipe buffer size(s), this can be kind-of large now because pipe
107 * space is pageable.  The pipe code will try to maintain locality of
108 * reference for performance reasons, so small amounts of outstanding I/O
109 * will not wipe the cache.
110 */
111#define MINPIPESIZE (PIPE_SIZE/3)
112#define MAXPIPESIZE (2*PIPE_SIZE/3)
113
114/*
115 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
116 * is there so that on large systems, we don't exhaust it.
117 */
118#define MAXPIPEKVA (8*1024*1024)
119
120/*
121 * Limit for direct transfers, we cannot, of course limit
122 * the amount of kva for pipes in general though.
123 */
124#define LIMITPIPEKVA (16*1024*1024)
125int amountpipekva;
126
127static void pipeclose __P((struct pipe *cpipe));
128static void pipebufferinit __P((struct pipe *cpipe));
129static void pipeinit __P((struct pipe *cpipe));
130static __inline int pipelock __P((struct pipe *cpipe, int catch));
131static __inline void pipeunlock __P((struct pipe *cpipe));
132#ifndef PIPE_NODIRECT
133static int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio));
134static void pipe_destroy_write_buffer __P((struct pipe *wpipe));
135static int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio));
136static void pipe_clone_write_buffer __P((struct pipe *wpipe));
137static void pipe_mark_pages_clean __P((struct pipe *cpipe));
138#endif
139static int pipewrite __P((struct pipe *wpipe, struct uio *uio, int nbio));
140static void pipespace __P((struct pipe *cpipe));
141
142/*
143 * The pipe system call for the DTYPE_PIPE type of pipes
144 */
145
146/* ARGSUSED */
147int
148pipe(p, uap, retval)
149	struct proc *p;
150	struct pipe_args /* {
151		int	dummy;
152	} */ *uap;
153	int retval[];
154{
155	register struct filedesc *fdp = p->p_fd;
156	struct file *rf, *wf;
157	struct pipe *rpipe, *wpipe;
158	int fd, error;
159
160	rpipe = malloc( sizeof (*rpipe), M_TEMP, M_WAITOK);
161	pipeinit(rpipe);
162	rpipe->pipe_state |= PIPE_DIRECTOK;
163	wpipe = malloc( sizeof (*wpipe), M_TEMP, M_WAITOK);
164	pipeinit(wpipe);
165	wpipe->pipe_state |= PIPE_DIRECTOK;
166
167	error = falloc(p, &rf, &fd);
168	if (error)
169		goto free2;
170	retval[0] = fd;
171	rf->f_flag = FREAD | FWRITE;
172	rf->f_type = DTYPE_PIPE;
173	rf->f_ops = &pipeops;
174	rf->f_data = (caddr_t)rpipe;
175	error = falloc(p, &wf, &fd);
176	if (error)
177		goto free3;
178	wf->f_flag = FREAD | FWRITE;
179	wf->f_type = DTYPE_PIPE;
180	wf->f_ops = &pipeops;
181	wf->f_data = (caddr_t)wpipe;
182	retval[1] = fd;
183
184	rpipe->pipe_peer = wpipe;
185	wpipe->pipe_peer = rpipe;
186
187	return (0);
188free3:
189	ffree(rf);
190	fdp->fd_ofiles[retval[0]] = 0;
191free2:
192	(void)pipeclose(wpipe);
193free1:
194	(void)pipeclose(rpipe);
195	return (error);
196}
197
198/*
199 * Allocate kva for pipe circular buffer, the space is pageable
200 */
201static void
202pipespace(cpipe)
203	struct pipe *cpipe;
204{
205	int npages, error;
206
207	npages = round_page(cpipe->pipe_buffer.size)/PAGE_SIZE;
208	/*
209	 * Create an object, I don't like the idea of paging to/from
210	 * kernel_object.
211	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
212	 */
213	cpipe->pipe_buffer.object = vm_object_allocate(OBJT_DEFAULT, npages);
214	cpipe->pipe_buffer.buffer = (caddr_t) vm_map_min(kernel_map);
215
216	/*
217	 * Insert the object into the kernel map, and allocate kva for it.
218	 * The map entry is, by default, pageable.
219	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
220	 */
221	error = vm_map_find(kernel_map, cpipe->pipe_buffer.object, 0,
222		(vm_offset_t *) &cpipe->pipe_buffer.buffer,
223		cpipe->pipe_buffer.size, 1,
224		VM_PROT_ALL, VM_PROT_ALL, 0);
225
226	if (error != KERN_SUCCESS)
227		panic("pipeinit: cannot allocate pipe -- out of kvm -- code = %d", error);
228	amountpipekva += cpipe->pipe_buffer.size;
229}
230
231/*
232 * initialize and allocate VM and memory for pipe
233 */
234static void
235pipeinit(cpipe)
236	struct pipe *cpipe;
237{
238	int s;
239
240	cpipe->pipe_buffer.in = 0;
241	cpipe->pipe_buffer.out = 0;
242	cpipe->pipe_buffer.cnt = 0;
243	cpipe->pipe_buffer.size = PIPE_SIZE;
244	/* Buffer kva gets dynamically allocated */
245	cpipe->pipe_buffer.buffer = NULL;
246
247	cpipe->pipe_state = 0;
248	cpipe->pipe_peer = NULL;
249	cpipe->pipe_busy = 0;
250	s = splhigh();
251	cpipe->pipe_ctime = time;
252	cpipe->pipe_atime = time;
253	cpipe->pipe_mtime = time;
254	splx(s);
255	bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel);
256
257#ifndef PIPE_NODIRECT
258	/*
259	 * pipe data structure initializations to support direct pipe I/O
260	 */
261	cpipe->pipe_map.cnt = 0;
262	cpipe->pipe_map.kva = 0;
263	cpipe->pipe_map.pos = 0;
264	cpipe->pipe_map.npages = 0;
265#endif
266}
267
268
269/*
270 * lock a pipe for I/O, blocking other access
271 */
272static __inline int
273pipelock(cpipe, catch)
274	struct pipe *cpipe;
275	int catch;
276{
277	int error;
278	while (cpipe->pipe_state & PIPE_LOCK) {
279		cpipe->pipe_state |= PIPE_LWANT;
280		if (error = tsleep( &cpipe->pipe_state,
281			catch?(PRIBIO|PCATCH):PRIBIO, "pipelk", 0)) {
282			return error;
283		}
284	}
285	cpipe->pipe_state |= PIPE_LOCK;
286	return 0;
287}
288
289/*
290 * unlock a pipe I/O lock
291 */
292static __inline void
293pipeunlock(cpipe)
294	struct pipe *cpipe;
295{
296	cpipe->pipe_state &= ~PIPE_LOCK;
297	if (cpipe->pipe_state & PIPE_LWANT) {
298		cpipe->pipe_state &= ~PIPE_LWANT;
299		wakeup(&cpipe->pipe_state);
300	}
301	return;
302}
303
304static __inline void
305pipeselwakeup(cpipe)
306	struct pipe *cpipe;
307{
308	if (cpipe->pipe_state & PIPE_SEL) {
309		cpipe->pipe_state &= ~PIPE_SEL;
310		selwakeup(&cpipe->pipe_sel);
311	}
312}
313
314#ifndef PIPE_NODIRECT
315#if 0
316static void
317pipe_mark_pages_clean(cpipe)
318	struct pipe *cpipe;
319{
320	vm_size_t off;
321	vm_page_t m;
322
323	for(off = 0; off < cpipe->pipe_buffer.object->size; off += 1) {
324		m = vm_page_lookup(cpipe->pipe_buffer.object, off);
325		if ((m != NULL) && (m->busy == 0) && (m->flags & PG_BUSY) == 0) {
326			m->dirty = 0;
327			pmap_clear_modify(VM_PAGE_TO_PHYS(m));
328		}
329	}
330}
331#endif
332#endif
333
334/* ARGSUSED */
335static int
336pipe_read(fp, uio, cred)
337	struct file *fp;
338	struct uio *uio;
339	struct ucred *cred;
340{
341
342	struct pipe *rpipe = (struct pipe *) fp->f_data;
343	int error = 0;
344	int nread = 0;
345	int size;
346
347	++rpipe->pipe_busy;
348	while (uio->uio_resid) {
349		/*
350		 * normal pipe buffer receive
351		 */
352		if (rpipe->pipe_buffer.cnt > 0) {
353			int size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
354			if (size > rpipe->pipe_buffer.cnt)
355				size = rpipe->pipe_buffer.cnt;
356			if (size > uio->uio_resid)
357				size = uio->uio_resid;
358			if ((error = pipelock(rpipe,1)) == 0) {
359				error = uiomove( &rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
360					size, uio);
361				pipeunlock(rpipe);
362			}
363			if (error) {
364				break;
365			}
366			rpipe->pipe_buffer.out += size;
367			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
368				rpipe->pipe_buffer.out = 0;
369
370			rpipe->pipe_buffer.cnt -= size;
371			nread += size;
372#ifndef PIPE_NODIRECT
373		/*
374		 * Direct copy, bypassing a kernel buffer.
375		 */
376		} else if ((size = rpipe->pipe_map.cnt) &&
377			(rpipe->pipe_state & PIPE_DIRECTW)) {
378			caddr_t va;
379			if (size > uio->uio_resid)
380				size = uio->uio_resid;
381			if ((error = pipelock(rpipe,1)) == 0) {
382				va = (caddr_t) rpipe->pipe_map.kva + rpipe->pipe_map.pos;
383				error = uiomove(va, size, uio);
384				pipeunlock(rpipe);
385			}
386			if (error)
387				break;
388			nread += size;
389			rpipe->pipe_map.pos += size;
390			rpipe->pipe_map.cnt -= size;
391			if (rpipe->pipe_map.cnt == 0) {
392				rpipe->pipe_state &= ~PIPE_DIRECTW;
393				wakeup(rpipe);
394			}
395#endif
396		} else {
397			/*
398			 * detect EOF condition
399			 */
400			if (rpipe->pipe_state & PIPE_EOF) {
401				break;
402			}
403			/*
404			 * If the "write-side" has been blocked, wake it up now.
405			 */
406			if (rpipe->pipe_state & PIPE_WANTW) {
407				rpipe->pipe_state &= ~PIPE_WANTW;
408				wakeup(rpipe);
409			}
410			if (nread > 0)
411				break;
412			if (rpipe->pipe_state & PIPE_NBIO) {
413				error = EAGAIN;
414				break;
415			}
416
417			/*
418			 * If there is no more to read in the pipe, reset
419			 * its pointers to the beginning.  This improves
420			 * cache hit stats.
421			 */
422
423			if ((error = pipelock(rpipe,1)) == 0) {
424				if (rpipe->pipe_buffer.cnt == 0) {
425					rpipe->pipe_buffer.in = 0;
426					rpipe->pipe_buffer.out = 0;
427				}
428				pipeunlock(rpipe);
429			} else {
430				break;
431			}
432			rpipe->pipe_state |= PIPE_WANTR;
433			if (error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) {
434				break;
435			}
436		}
437	}
438
439	if (error == 0) {
440		int s = splhigh();
441		rpipe->pipe_atime = time;
442		splx(s);
443	}
444
445	--rpipe->pipe_busy;
446	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
447		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
448		wakeup(rpipe);
449	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
450		/*
451		 * If there is no more to read in the pipe, reset
452		 * its pointers to the beginning.  This improves
453		 * cache hit stats.
454		 */
455		if ((error == 0) && (error = pipelock(rpipe,1)) == 0) {
456			if (rpipe->pipe_buffer.cnt == 0) {
457#if 0
458				pipe_mark_pages_clean(rpipe);
459#endif
460				rpipe->pipe_buffer.in = 0;
461				rpipe->pipe_buffer.out = 0;
462			}
463			pipeunlock(rpipe);
464		}
465
466		/*
467		 * If the "write-side" has been blocked, wake it up now.
468		 */
469		if (rpipe->pipe_state & PIPE_WANTW) {
470			rpipe->pipe_state &= ~PIPE_WANTW;
471			wakeup(rpipe);
472		}
473	}
474
475	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) > 0)
476		pipeselwakeup(rpipe);
477
478	return error;
479}
480
481#ifndef PIPE_NODIRECT
482/*
483 * Map the sending processes' buffer into kernel space and wire it.
484 * This is similar to a physical write operation.
485 */
486static int
487pipe_build_write_buffer(wpipe, uio)
488	struct pipe *wpipe;
489	struct uio *uio;
490{
491	int size;
492	int i;
493	vm_offset_t addr, endaddr, paddr;
494
495	size = uio->uio_iov->iov_len;
496	if (size > wpipe->pipe_buffer.size)
497		size = wpipe->pipe_buffer.size;
498
499	endaddr = round_page(uio->uio_iov->iov_base + size);
500	for(i = 0, addr = trunc_page(uio->uio_iov->iov_base);
501		addr < endaddr;
502		addr += PAGE_SIZE, i+=1) {
503
504		vm_page_t m;
505
506		vm_fault_quick( (caddr_t) addr, VM_PROT_READ);
507		paddr = pmap_kextract(addr);
508		if (!paddr) {
509			int j;
510			for(j=0;j<i;j++)
511				vm_page_unwire(wpipe->pipe_map.ms[j]);
512			return EFAULT;
513		}
514
515		m = PHYS_TO_VM_PAGE(paddr);
516		vm_page_wire(m);
517		wpipe->pipe_map.ms[i] = m;
518	}
519
520/*
521 * set up the control block
522 */
523	wpipe->pipe_map.npages = i;
524	wpipe->pipe_map.pos = ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
525	wpipe->pipe_map.cnt = size;
526
527/*
528 * and map the buffer
529 */
530	if (wpipe->pipe_map.kva == 0) {
531		/*
532		 * We need to allocate space for an extra page because the
533		 * address range might (will) span pages at times.
534		 */
535		wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
536			wpipe->pipe_buffer.size + PAGE_SIZE);
537		amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE;
538	}
539	pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
540		wpipe->pipe_map.npages);
541
542/*
543 * and update the uio data
544 */
545
546	uio->uio_iov->iov_len -= size;
547	uio->uio_iov->iov_base += size;
548	if (uio->uio_iov->iov_len == 0)
549		uio->uio_iov++;
550	uio->uio_resid -= size;
551	uio->uio_offset += size;
552	return 0;
553}
554
555/*
556 * unmap and unwire the process buffer
557 */
558static void
559pipe_destroy_write_buffer(wpipe)
560struct pipe *wpipe;
561{
562	int i;
563	pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
564
565	if (wpipe->pipe_map.kva) {
566		if (amountpipekva > MAXPIPEKVA) {
567			vm_offset_t kva = wpipe->pipe_map.kva;
568			wpipe->pipe_map.kva = 0;
569			kmem_free(kernel_map, kva,
570				wpipe->pipe_buffer.size + PAGE_SIZE);
571			amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
572		}
573	}
574	for (i=0;i<wpipe->pipe_map.npages;i++)
575		vm_page_unwire(wpipe->pipe_map.ms[i]);
576}
577
578/*
579 * In the case of a signal, the writing process might go away.  This
580 * code copies the data into the circular buffer so that the source
581 * pages can be freed without loss of data.
582 */
583static void
584pipe_clone_write_buffer(wpipe)
585struct pipe *wpipe;
586{
587	int size;
588	int pos;
589
590	size = wpipe->pipe_map.cnt;
591	pos = wpipe->pipe_map.pos;
592	bcopy((caddr_t) wpipe->pipe_map.kva+pos,
593			(caddr_t) wpipe->pipe_buffer.buffer,
594			size);
595
596	wpipe->pipe_buffer.in = size;
597	wpipe->pipe_buffer.out = 0;
598	wpipe->pipe_buffer.cnt = size;
599	wpipe->pipe_state &= ~PIPE_DIRECTW;
600
601	pipe_destroy_write_buffer(wpipe);
602}
603
604/*
605 * This implements the pipe buffer write mechanism.  Note that only
606 * a direct write OR a normal pipe write can be pending at any given time.
607 * If there are any characters in the pipe buffer, the direct write will
608 * be deferred until the receiving process grabs all of the bytes from
609 * the pipe buffer.  Then the direct mapping write is set-up.
610 */
611static int
612pipe_direct_write(wpipe, uio)
613	struct pipe *wpipe;
614	struct uio *uio;
615{
616	int error;
617retry:
618	while (wpipe->pipe_state & PIPE_DIRECTW) {
619		if ( wpipe->pipe_state & PIPE_WANTR) {
620			wpipe->pipe_state &= ~PIPE_WANTR;
621			wakeup(wpipe);
622		}
623		wpipe->pipe_state |= PIPE_WANTW;
624		error = tsleep(wpipe,
625				PRIBIO|PCATCH, "pipdww", 0);
626		if (error || (wpipe->pipe_state & PIPE_EOF))
627			goto error1;
628	}
629	wpipe->pipe_map.cnt = 0;	/* transfer not ready yet */
630	if (wpipe->pipe_buffer.cnt > 0) {
631		if ( wpipe->pipe_state & PIPE_WANTR) {
632			wpipe->pipe_state &= ~PIPE_WANTR;
633			wakeup(wpipe);
634		}
635
636		wpipe->pipe_state |= PIPE_WANTW;
637		error = tsleep(wpipe,
638				PRIBIO|PCATCH, "pipdwc", 0);
639		if (error || (wpipe->pipe_state & PIPE_EOF)) {
640			wpipe->pipe_state &= ~PIPE_DIRECTW;
641			if (error == 0)
642				error = EPIPE;
643			goto error1;
644		}
645		goto retry;
646	}
647
648	wpipe->pipe_state |= PIPE_DIRECTW;
649
650	error = pipe_build_write_buffer(wpipe, uio);
651	if (error) {
652		wpipe->pipe_state &= ~PIPE_DIRECTW;
653		goto error1;
654	}
655
656	error = 0;
657	while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
658		if (wpipe->pipe_state & PIPE_EOF) {
659			pipelock(wpipe, 0);
660			pipe_destroy_write_buffer(wpipe);
661			pipeunlock(wpipe);
662			pipeselwakeup(wpipe);
663			wakeup(wpipe);
664			return EPIPE;
665		}
666		if (wpipe->pipe_state & PIPE_WANTR) {
667			wpipe->pipe_state &= ~PIPE_WANTR;
668			wakeup(wpipe);
669		}
670		pipeselwakeup(wpipe);
671		error = tsleep(wpipe, PRIBIO|PCATCH, "pipdwt", 0);
672	}
673
674	pipelock(wpipe,0);
675	if (wpipe->pipe_state & PIPE_DIRECTW) {
676		/*
677		 * this bit of trickery substitutes a kernel buffer for
678		 * the process that might be going away.
679		 */
680		pipe_clone_write_buffer(wpipe);
681	} else {
682		pipe_destroy_write_buffer(wpipe);
683	}
684	pipeunlock(wpipe);
685	return error;
686
687error1:
688	wakeup(wpipe);
689	return error;
690}
691#endif
692
693static __inline int
694pipewrite(wpipe, uio, nbio)
695	struct pipe *wpipe;
696	struct uio *uio;
697	int nbio;
698{
699	int error = 0;
700	int orig_resid;
701
702	/*
703	 * detect loss of pipe read side, issue SIGPIPE if lost.
704	 */
705	if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF)) {
706		return EPIPE;
707	}
708
709	if( wpipe->pipe_buffer.buffer == NULL) {
710		if ((error = pipelock(wpipe,1)) == 0) {
711			pipespace(wpipe);
712			pipeunlock(wpipe);
713		} else {
714			return error;
715		}
716	}
717
718	++wpipe->pipe_busy;
719	orig_resid = uio->uio_resid;
720	while (uio->uio_resid) {
721		int space;
722#ifndef PIPE_NODIRECT
723		/*
724		 * If the transfer is large, we can gain performance if
725		 * we do process-to-process copies directly.
726		 */
727		if ((amountpipekva < LIMITPIPEKVA) &&
728			(uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
729			error = pipe_direct_write( wpipe, uio);
730			if (error) {
731				break;
732			}
733			continue;
734		}
735#endif
736
737		/*
738		 * Pipe buffered writes cannot be coincidental with
739		 * direct writes.  We wait until the currently executing
740		 * direct write is completed before we start filling the
741		 * pipe buffer.
742		 */
743	retrywrite:
744		while (wpipe->pipe_state & PIPE_DIRECTW) {
745			if (wpipe->pipe_state & PIPE_WANTR) {
746				wpipe->pipe_state &= ~PIPE_WANTR;
747				wakeup(wpipe);
748			}
749			error = tsleep(wpipe,
750					PRIBIO|PCATCH, "pipbww", 0);
751			if (error)
752				break;
753		}
754
755		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
756		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
757			space = 0;
758
759		/*
760		 * We must afford contiguous writes on buffers of size
761		 * PIPE_BUF or less.
762		 */
763		if (space > 0) {
764			int size = wpipe->pipe_buffer.size - wpipe->pipe_buffer.in;
765			if (size > space)
766				size = space;
767			if (size > uio->uio_resid)
768				size = uio->uio_resid;
769			if ((error = pipelock(wpipe,1)) == 0) {
770				/*
771				 * It is possible for a direct write to
772				 * slip in on us... handle it here...
773				 */
774				if (wpipe->pipe_state & PIPE_DIRECTW) {
775					pipeunlock(wpipe);
776					goto retrywrite;
777				}
778				error = uiomove( &wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
779					size, uio);
780				pipeunlock(wpipe);
781			}
782			if (error)
783				break;
784
785			wpipe->pipe_buffer.in += size;
786			if (wpipe->pipe_buffer.in >= wpipe->pipe_buffer.size)
787				wpipe->pipe_buffer.in = 0;
788
789			wpipe->pipe_buffer.cnt += size;
790		} else {
791			/*
792			 * If the "read-side" has been blocked, wake it up now.
793			 */
794			if (wpipe->pipe_state & PIPE_WANTR) {
795				wpipe->pipe_state &= ~PIPE_WANTR;
796				wakeup(wpipe);
797			}
798
799			/*
800			 * don't block on non-blocking I/O
801			 */
802			if (nbio) {
803				error = EAGAIN;
804				break;
805			}
806
807			/*
808			 * We have no more space and have something to offer,
809			 * wake up selects.
810			 */
811			pipeselwakeup(wpipe);
812
813			wpipe->pipe_state |= PIPE_WANTW;
814			if (error = tsleep(wpipe, (PRIBIO+1)|PCATCH, "pipewr", 0)) {
815				break;
816			}
817			/*
818			 * If read side wants to go away, we just issue a signal
819			 * to ourselves.
820			 */
821			if (wpipe->pipe_state & PIPE_EOF) {
822				error = EPIPE;
823				break;
824			}
825		}
826	}
827
828	if ((wpipe->pipe_busy == 0) &&
829		(wpipe->pipe_state & PIPE_WANT)) {
830		wpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTR);
831		wakeup(wpipe);
832	} else if (wpipe->pipe_buffer.cnt > 0) {
833		/*
834		 * If we have put any characters in the buffer, we wake up
835		 * the reader.
836		 */
837		if (wpipe->pipe_state & PIPE_WANTR) {
838			wpipe->pipe_state &= ~PIPE_WANTR;
839			wakeup(wpipe);
840		}
841	}
842
843	/*
844	 * Don't return EPIPE if I/O was successful
845	 */
846	if ((wpipe->pipe_buffer.cnt == 0) &&
847		(uio->uio_resid == 0) &&
848		(error == EPIPE))
849		error = 0;
850
851	if (error = 0) {
852		int s = splhigh();
853		wpipe->pipe_mtime = time;
854		splx(s);
855	}
856	/*
857	 * We have something to offer,
858	 * wake up select.
859	 */
860	if (wpipe->pipe_buffer.cnt > 0)
861		pipeselwakeup(wpipe);
862
863	--wpipe->pipe_busy;
864	return error;
865}
866
867/* ARGSUSED */
868static int
869pipe_write(fp, uio, cred)
870	struct file *fp;
871	struct uio *uio;
872	struct ucred *cred;
873{
874	struct pipe *rpipe = (struct pipe *) fp->f_data;
875	struct pipe *wpipe = rpipe->pipe_peer;
876	return pipewrite(wpipe, uio, (rpipe->pipe_state & PIPE_NBIO)?1:0);
877}
878
879/*
880 * we implement a very minimal set of ioctls for compatibility with sockets.
881 */
882int
883pipe_ioctl(fp, cmd, data, p)
884	struct file *fp;
885	int cmd;
886	register caddr_t data;
887	struct proc *p;
888{
889	register struct pipe *mpipe = (struct pipe *)fp->f_data;
890
891	switch (cmd) {
892
893	case FIONBIO:
894		if (*(int *)data)
895			mpipe->pipe_state |= PIPE_NBIO;
896		else
897			mpipe->pipe_state &= ~PIPE_NBIO;
898		return (0);
899
900	case FIOASYNC:
901		if (*(int *)data) {
902			mpipe->pipe_state |= PIPE_ASYNC;
903		} else {
904			mpipe->pipe_state &= ~PIPE_ASYNC;
905		}
906		return (0);
907
908	case FIONREAD:
909		if (mpipe->pipe_state & PIPE_DIRECTW)
910			*(int *)data = mpipe->pipe_map.cnt;
911		else
912			*(int *)data = mpipe->pipe_buffer.cnt;
913		return (0);
914
915	case SIOCSPGRP:
916		mpipe->pipe_pgid = *(int *)data;
917		return (0);
918
919	case SIOCGPGRP:
920		*(int *)data = mpipe->pipe_pgid;
921		return (0);
922
923	}
924	return ENOSYS;
925}
926
927int
928pipe_select(fp, which, p)
929	struct file *fp;
930	int which;
931	struct proc *p;
932{
933	register struct pipe *rpipe = (struct pipe *)fp->f_data;
934	struct pipe *wpipe;
935
936	wpipe = rpipe->pipe_peer;
937	switch (which) {
938
939	case FREAD:
940		if (rpipe->pipe_buffer.cnt > 0 ||
941			(rpipe->pipe_state & PIPE_EOF)) {
942			return (1);
943		}
944		selrecord(p, &rpipe->pipe_sel);
945		rpipe->pipe_state |= PIPE_SEL;
946		break;
947
948	case FWRITE:
949		if ((wpipe == NULL) ||
950			(wpipe->pipe_state & PIPE_EOF) ||
951			((wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) {
952			return (1);
953		}
954		selrecord(p, &wpipe->pipe_sel);
955		wpipe->pipe_state |= PIPE_SEL;
956		break;
957
958	case 0:
959		if ((rpipe->pipe_state & PIPE_EOF) ||
960			(wpipe == NULL) ||
961			(wpipe->pipe_state & PIPE_EOF)) {
962			return (1);
963		}
964
965		selrecord(p, &rpipe->pipe_sel);
966		rpipe->pipe_state |= PIPE_SEL;
967		break;
968	}
969	return (0);
970}
971
972int
973pipe_stat(pipe, ub)
974	register struct pipe *pipe;
975	register struct stat *ub;
976{
977	bzero((caddr_t)ub, sizeof (*ub));
978	ub->st_mode = S_IFSOCK;
979	ub->st_blksize = pipe->pipe_buffer.size;
980	ub->st_size = pipe->pipe_buffer.cnt;
981	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
982	TIMEVAL_TO_TIMESPEC(&pipe->pipe_atime, &ub->st_atimespec);
983	TIMEVAL_TO_TIMESPEC(&pipe->pipe_mtime, &ub->st_mtimespec);
984	TIMEVAL_TO_TIMESPEC(&pipe->pipe_ctime, &ub->st_ctimespec);
985	return 0;
986}
987
988/* ARGSUSED */
989static int
990pipe_close(fp, p)
991	struct file *fp;
992	struct proc *p;
993{
994	int error = 0;
995	struct pipe *cpipe = (struct pipe *)fp->f_data;
996	pipeclose(cpipe);
997	fp->f_data = NULL;
998	return 0;
999}
1000
1001/*
1002 * shutdown the pipe
1003 */
1004static void
1005pipeclose(cpipe)
1006	struct pipe *cpipe;
1007{
1008	struct pipe *ppipe;
1009	if (cpipe) {
1010
1011		pipeselwakeup(cpipe);
1012
1013		/*
1014		 * If the other side is blocked, wake it up saying that
1015		 * we want to close it down.
1016		 */
1017		while (cpipe->pipe_busy) {
1018			wakeup(cpipe);
1019			cpipe->pipe_state |= PIPE_WANT|PIPE_EOF;
1020			tsleep(cpipe, PRIBIO, "pipecl", 0);
1021		}
1022
1023		/*
1024		 * Disconnect from peer
1025		 */
1026		if (ppipe = cpipe->pipe_peer) {
1027			pipeselwakeup(ppipe);
1028
1029			ppipe->pipe_state |= PIPE_EOF;
1030			wakeup(ppipe);
1031			ppipe->pipe_peer = NULL;
1032		}
1033
1034		/*
1035		 * free resources
1036		 */
1037		if (cpipe->pipe_buffer.buffer) {
1038			amountpipekva -= cpipe->pipe_buffer.size;
1039			kmem_free(kernel_map,
1040				(vm_offset_t)cpipe->pipe_buffer.buffer,
1041				cpipe->pipe_buffer.size);
1042		}
1043#ifndef PIPE_NODIRECT
1044		if (cpipe->pipe_map.kva) {
1045			amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE;
1046			kmem_free(kernel_map,
1047				cpipe->pipe_map.kva,
1048				cpipe->pipe_buffer.size + PAGE_SIZE);
1049		}
1050#endif
1051		free(cpipe, M_TEMP);
1052	}
1053}
1054#endif
1055