sys_pipe.c revision 13909
1/*
2 * Copyright (c) 1996 John S. Dyson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice immediately at the beginning of the file, without modification,
10 *    this list of conditions, and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 * 3. Absolutely no warranty of function or purpose is made by the author
15 *    John S. Dyson.
16 * 4. This work was done expressly for inclusion into FreeBSD.  Other use
17 *    is allowed if this notation is included.
18 * 5. Modifications may be freely made to this file if the above conditions
19 *    are met.
20 *
21 * $Id: sys_pipe.c,v 1.2 1996/01/29 02:57:33 dyson Exp $
22 */
23
24#ifndef OLD_PIPE
25
26/*
27 * This file contains a high-performance replacement for the socket-based
28 * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
29 * all features of sockets, but does do everything that pipes normally
30 * do.
31 */
32
33/*
34 * This code has two modes of operation, a small write mode and a large
35 * write mode.  The small write mode acts like conventional pipes with
36 * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
37 * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
38 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
39 * the receiving process can copy it directly from the pages in the sending
40 * process.
41 *
42 * If the sending process receives a signal, it is possible that it will
43 * go away, and certainly it's address space can change, because control
44 * is returned back to the user-mode side.  In that case, the pipe code
45 * arranges to copy the buffer supplied by the user process, to a pageable
46 * kernel buffer, and the receiving process will grab the data from the
47 * pageable kernel buffer.  Since signals don't happen all that often,
48 * the copy operation is normally eliminated.
49 *
50 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
51 * happen for small transfers so that the system will not spend all of
52 * it's time context switching.  PIPE_SIZE is constrained by the
53 * amount of kernel virtual memory.
54 */
55
56#include <sys/param.h>
57#include <sys/systm.h>
58#include <sys/proc.h>
59#include <sys/file.h>
60#include <sys/protosw.h>
61#include <sys/stat.h>
62#include <sys/filedesc.h>
63#include <sys/malloc.h>
64#include <sys/ioctl.h>
65#include <sys/stat.h>
66#include <sys/select.h>
67#include <sys/signalvar.h>
68#include <sys/errno.h>
69#include <sys/queue.h>
70#include <sys/vmmeter.h>
71#include <sys/kernel.h>
72#include <sys/sysproto.h>
73#include <sys/pipe.h>
74
75#include <vm/vm.h>
76#include <vm/vm_prot.h>
77#include <vm/vm_param.h>
78#include <vm/lock.h>
79#include <vm/vm_object.h>
80#include <vm/vm_kern.h>
81#include <vm/vm_extern.h>
82#include <vm/pmap.h>
83#include <vm/vm_map.h>
84#include <vm/vm_page.h>
85
86static int pipe_read __P((struct file *fp, struct uio *uio,
87		struct ucred *cred));
88static int pipe_write __P((struct file *fp, struct uio *uio,
89		struct ucred *cred));
90static int pipe_close __P((struct file *fp, struct proc *p));
91static int pipe_select __P((struct file *fp, int which, struct proc *p));
92static int pipe_ioctl __P((struct file *fp, int cmd, caddr_t data, struct proc *p));
93
94static struct fileops pipeops =
95    { pipe_read, pipe_write, pipe_ioctl, pipe_select, pipe_close };
96
97/*
98 * Default pipe buffer size(s), this can be kind-of large now because pipe
99 * space is pageable.  The pipe code will try to maintain locality of
100 * reference for performance reasons, so small amounts of outstanding I/O
101 * will not wipe the cache.
102 */
103#define MINPIPESIZE (PIPE_SIZE/3)
104#define MAXPIPESIZE (2*PIPE_SIZE/3)
105
106/*
107 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
108 * is there so that on large systems, we don't exhaust it.
109 */
110#define MAXPIPEKVA (8*1024*1024)
111
112/*
113 * Limit for direct transfers, we cannot, of course limit
114 * the amount of kva for pipes in general though.
115 */
116#define LIMITPIPEKVA (16*1024*1024)
117int amountpipekva;
118
119static void pipeclose __P((struct pipe *cpipe));
120static void pipebufferinit __P((struct pipe *cpipe));
121static void pipeinit __P((struct pipe *cpipe));
122static __inline int pipelock __P((struct pipe *cpipe, int catch));
123static __inline void pipeunlock __P((struct pipe *cpipe));
124static int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio));
125static void pipe_destroy_write_buffer __P((struct pipe *wpipe));
126static int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio));
127static void pipe_clone_write_buffer __P((struct pipe *wpipe));
128static void pipe_mark_pages_clean __P((struct pipe *cpipe));
129static int pipewrite __P((struct pipe *wpipe, struct uio *uio, int nbio));
130static void pipespace __P((struct pipe *cpipe));
131
132/*
133 * The pipe system call for the DTYPE_PIPE type of pipes
134 */
135
136/* ARGSUSED */
137int
138pipe(p, uap, retval)
139	struct proc *p;
140	struct pipe_args /* {
141		int	dummy;
142	} */ *uap;
143	int retval[];
144{
145	register struct filedesc *fdp = p->p_fd;
146	struct file *rf, *wf;
147	struct pipe *rpipe, *wpipe;
148	int fd, error;
149
150	rpipe = malloc( sizeof (*rpipe), M_TEMP, M_WAITOK);
151	pipeinit(rpipe);
152	rpipe->pipe_state |= PIPE_DIRECTOK;
153	wpipe = malloc( sizeof (*wpipe), M_TEMP, M_WAITOK);
154	pipeinit(wpipe);
155	wpipe->pipe_state |= PIPE_DIRECTOK;
156
157	error = falloc(p, &rf, &fd);
158	if (error)
159		goto free2;
160	retval[0] = fd;
161	rf->f_flag = FREAD | FWRITE;
162	rf->f_type = DTYPE_PIPE;
163	rf->f_ops = &pipeops;
164	rf->f_data = (caddr_t)rpipe;
165	error = falloc(p, &wf, &fd);
166	if (error)
167		goto free3;
168	wf->f_flag = FREAD | FWRITE;
169	wf->f_type = DTYPE_PIPE;
170	wf->f_ops = &pipeops;
171	wf->f_data = (caddr_t)wpipe;
172	retval[1] = fd;
173
174	rpipe->pipe_peer = wpipe;
175	wpipe->pipe_peer = rpipe;
176
177	return (0);
178free3:
179	ffree(rf);
180	fdp->fd_ofiles[retval[0]] = 0;
181free2:
182	(void)pipeclose(wpipe);
183free1:
184	(void)pipeclose(rpipe);
185	return (error);
186}
187
188/*
189 * Allocate kva for pipe circular buffer, the space is pageable
190 */
191static void
192pipespace(cpipe)
193	struct pipe *cpipe;
194{
195	int npages, error;
196
197	npages = round_page(cpipe->pipe_buffer.size)/PAGE_SIZE;
198	/*
199	 * Create an object, I don't like the idea of paging to/from
200	 * kernel_object.
201	 */
202	cpipe->pipe_buffer.object = vm_object_allocate(OBJT_DEFAULT, npages);
203	cpipe->pipe_buffer.buffer = (caddr_t) vm_map_min(kernel_map);
204
205	/*
206	 * Insert the object into the kernel map, and allocate kva for it.
207	 * The map entry is, by default, pageable.
208	 */
209	error = vm_map_find(kernel_map, cpipe->pipe_buffer.object, 0,
210		(vm_offset_t *) &cpipe->pipe_buffer.buffer,
211		cpipe->pipe_buffer.size, 1,
212		VM_PROT_ALL, VM_PROT_ALL, 0);
213
214	if (error != KERN_SUCCESS)
215		panic("pipeinit: cannot allocate pipe -- out of kvm -- code = %d", error);
216	amountpipekva += cpipe->pipe_buffer.size;
217}
218
219/*
220 * initialize and allocate VM and memory for pipe
221 */
222static void
223pipeinit(cpipe)
224	struct pipe *cpipe;
225{
226
227	cpipe->pipe_buffer.in = 0;
228	cpipe->pipe_buffer.out = 0;
229	cpipe->pipe_buffer.cnt = 0;
230	cpipe->pipe_buffer.size = PIPE_SIZE;
231	/* Buffer kva gets dynamically allocated */
232	cpipe->pipe_buffer.buffer = NULL;
233
234	cpipe->pipe_state = 0;
235	cpipe->pipe_peer = NULL;
236	cpipe->pipe_busy = 0;
237	cpipe->pipe_ctime = time;
238	cpipe->pipe_atime = time;
239	cpipe->pipe_mtime = time;
240	bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel);
241
242	/*
243	 * pipe data structure initializations to support direct pipe I/O
244	 */
245	cpipe->pipe_map.cnt = 0;
246	cpipe->pipe_map.kva = 0;
247	cpipe->pipe_map.pos = 0;
248	cpipe->pipe_map.npages = 0;
249}
250
251
252/*
253 * lock a pipe for I/O, blocking other access
254 */
255static __inline int
256pipelock(cpipe, catch)
257	struct pipe *cpipe;
258	int catch;
259{
260	int error;
261	while (cpipe->pipe_state & PIPE_LOCK) {
262		cpipe->pipe_state |= PIPE_LWANT;
263		if (error = tsleep( &cpipe->pipe_state,
264			catch?(PRIBIO|PCATCH):PRIBIO, "pipelk", 0)) {
265			return error;
266		}
267	}
268	cpipe->pipe_state |= PIPE_LOCK;
269	return 0;
270}
271
272/*
273 * unlock a pipe I/O lock
274 */
275static __inline void
276pipeunlock(cpipe)
277	struct pipe *cpipe;
278{
279	cpipe->pipe_state &= ~PIPE_LOCK;
280	if (cpipe->pipe_state & PIPE_LWANT) {
281		cpipe->pipe_state &= ~PIPE_LWANT;
282		wakeup(&cpipe->pipe_state);
283	}
284	return;
285}
286
287#if 0
288static void
289pipe_mark_pages_clean(cpipe)
290	struct pipe *cpipe;
291{
292	vm_size_t off;
293	vm_page_t m;
294
295	for(off = 0; off < cpipe->pipe_buffer.object->size; off += 1) {
296		m = vm_page_lookup(cpipe->pipe_buffer.object, off);
297		if ((m != NULL) && (m->busy == 0) && (m->flags & PG_BUSY) == 0) {
298			m->dirty = 0;
299			pmap_clear_modify(VM_PAGE_TO_PHYS(m));
300		}
301	}
302}
303#endif
304
305/* ARGSUSED */
306static int
307pipe_read(fp, uio, cred)
308	struct file *fp;
309	struct uio *uio;
310	struct ucred *cred;
311{
312
313	struct pipe *rpipe = (struct pipe *) fp->f_data;
314	int error = 0;
315	int nread = 0;
316	int size;
317
318	++rpipe->pipe_busy;
319	while (uio->uio_resid) {
320		/*
321		 * normal pipe buffer receive
322		 */
323		if (rpipe->pipe_buffer.cnt > 0) {
324			int size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
325			if (size > rpipe->pipe_buffer.cnt)
326				size = rpipe->pipe_buffer.cnt;
327			if (size > uio->uio_resid)
328				size = uio->uio_resid;
329			if ((error = pipelock(rpipe,1)) == 0) {
330				error = uiomove( &rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
331					size, uio);
332				pipeunlock(rpipe);
333			}
334			if (error) {
335				break;
336			}
337			rpipe->pipe_buffer.out += size;
338			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
339				rpipe->pipe_buffer.out = 0;
340
341			rpipe->pipe_buffer.cnt -= size;
342			nread += size;
343			rpipe->pipe_atime = time;
344		/*
345		 * Direct copy, bypassing a kernel buffer.
346		 */
347		} else if ((size = rpipe->pipe_map.cnt) &&
348			(rpipe->pipe_state & PIPE_DIRECTW)) {
349			caddr_t va;
350			if (size > uio->uio_resid)
351				size = uio->uio_resid;
352			if ((error = pipelock(rpipe,1)) == 0) {
353				va = (caddr_t) rpipe->pipe_map.kva + rpipe->pipe_map.pos;
354				error = uiomove(va, size, uio);
355				pipeunlock(rpipe);
356			}
357			if (error)
358				break;
359			nread += size;
360			rpipe->pipe_atime = time;
361			rpipe->pipe_map.pos += size;
362			rpipe->pipe_map.cnt -= size;
363			if (rpipe->pipe_map.cnt == 0) {
364				rpipe->pipe_state &= ~PIPE_DIRECTW;
365				wakeup(rpipe);
366			}
367		} else {
368			/*
369			 * detect EOF condition
370			 */
371			if (rpipe->pipe_state & PIPE_EOF) {
372				break;
373			}
374			/*
375			 * If the "write-side" has been blocked, wake it up now.
376			 */
377			if (rpipe->pipe_state & PIPE_WANTW) {
378				rpipe->pipe_state &= ~PIPE_WANTW;
379				wakeup(rpipe);
380			}
381			if (nread > 0)
382				break;
383			if (rpipe->pipe_state & PIPE_NBIO) {
384				error = EAGAIN;
385				break;
386			}
387
388			/*
389			 * If there is no more to read in the pipe, reset
390			 * it's pointers to the beginning.  This improves
391			 * cache hit stats.
392			 */
393
394			if ((error = pipelock(rpipe,1)) == 0) {
395				if (rpipe->pipe_buffer.cnt == 0) {
396					rpipe->pipe_buffer.in = 0;
397					rpipe->pipe_buffer.out = 0;
398				}
399				pipeunlock(rpipe);
400			} else {
401				break;
402			}
403			rpipe->pipe_state |= PIPE_WANTR;
404			if (error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) {
405				break;
406			}
407		}
408	}
409
410	--rpipe->pipe_busy;
411	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
412		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
413		wakeup(rpipe);
414	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
415		/*
416		 * If there is no more to read in the pipe, reset
417		 * it's pointers to the beginning.  This improves
418		 * cache hit stats.
419		 */
420		if ((error == 0) && (error = pipelock(rpipe,1)) == 0) {
421			if (rpipe->pipe_buffer.cnt == 0) {
422#if 0
423				pipe_mark_pages_clean(rpipe);
424#endif
425				rpipe->pipe_buffer.in = 0;
426				rpipe->pipe_buffer.out = 0;
427			}
428			pipeunlock(rpipe);
429		}
430
431		/*
432		 * If the "write-side" has been blocked, wake it up now.
433		 */
434		if (rpipe->pipe_state & PIPE_WANTW) {
435			rpipe->pipe_state &= ~PIPE_WANTW;
436			wakeup(rpipe);
437		}
438	}
439	if (rpipe->pipe_state & PIPE_SEL) {
440		rpipe->pipe_state &= ~PIPE_SEL;
441		selwakeup(&rpipe->pipe_sel);
442	}
443	return error;
444}
445
446/*
447 * Map the sending processes' buffer into kernel space and wire it.
448 * This is similar to a physical write operation.
449 */
450static int
451pipe_build_write_buffer(wpipe, uio)
452	struct pipe *wpipe;
453	struct uio *uio;
454{
455	int size;
456	int i;
457	vm_offset_t addr, endaddr, paddr;
458
459	size = uio->uio_iov->iov_len;
460	if (size > wpipe->pipe_buffer.size)
461		size = wpipe->pipe_buffer.size;
462
463	endaddr = round_page(uio->uio_iov->iov_base + size);
464	for(i = 0, addr = trunc_page(uio->uio_iov->iov_base);
465		addr < endaddr;
466		addr += PAGE_SIZE, i+=1) {
467
468		vm_page_t m;
469
470		vm_fault_quick( (caddr_t) addr, VM_PROT_READ);
471		paddr = pmap_kextract(addr);
472		if (!paddr) {
473			int j;
474			for(j=0;j<i;j++)
475				vm_page_unwire(wpipe->pipe_map.ms[j]);
476			return EFAULT;
477		}
478
479		m = PHYS_TO_VM_PAGE(paddr);
480		vm_page_wire(m);
481		wpipe->pipe_map.ms[i] = m;
482	}
483
484/*
485 * set up the control block
486 */
487	wpipe->pipe_map.npages = i;
488	wpipe->pipe_map.pos = ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
489	wpipe->pipe_map.cnt = size;
490
491/*
492 * and map the buffer
493 */
494	if (wpipe->pipe_map.kva == 0) {
495		wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
496			wpipe->pipe_buffer.size);
497		amountpipekva += wpipe->pipe_buffer.size;
498	}
499	pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
500		wpipe->pipe_map.npages);
501
502/*
503 * and update the uio data
504 */
505
506	uio->uio_iov->iov_len -= size;
507	uio->uio_iov->iov_base += size;
508	if (uio->uio_iov->iov_len == 0)
509		uio->uio_iov++;
510	uio->uio_resid -= size;
511	uio->uio_offset += size;
512	return 0;
513}
514
515/*
516 * unmap and unwire the process buffer
517 */
518static void
519pipe_destroy_write_buffer(wpipe)
520struct pipe *wpipe;
521{
522	int i;
523	pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
524
525	if (wpipe->pipe_map.kva) {
526		if (amountpipekva > MAXPIPEKVA) {
527			vm_offset_t kva = wpipe->pipe_map.kva;
528			wpipe->pipe_map.kva = 0;
529			kmem_free(kernel_map, kva,
530				wpipe->pipe_buffer.size);
531			amountpipekva -= wpipe->pipe_buffer.size;
532		}
533	}
534	for (i=0;i<wpipe->pipe_map.npages;i++)
535		vm_page_unwire(wpipe->pipe_map.ms[i]);
536}
537
538/*
539 * In the case of a signal, the writing process might go away.  This
540 * code copies the data into the circular buffer so that the source
541 * pages can be freed without loss of data.
542 */
543static void
544pipe_clone_write_buffer(wpipe)
545struct pipe *wpipe;
546{
547	int size;
548	int pos;
549
550	size = wpipe->pipe_map.cnt;
551	pos = wpipe->pipe_map.pos;
552	bcopy((caddr_t) wpipe->pipe_map.kva+pos,
553			(caddr_t) wpipe->pipe_buffer.buffer,
554			size);
555
556	wpipe->pipe_buffer.in = size;
557	wpipe->pipe_buffer.out = 0;
558	wpipe->pipe_buffer.cnt = size;
559	wpipe->pipe_state &= ~PIPE_DIRECTW;
560
561	pipe_destroy_write_buffer(wpipe);
562}
563
564/*
565 * This implements the pipe buffer write mechanism.  Note that only
566 * a direct write OR a normal pipe write can be pending at any given time.
567 * If there are any characters in the pipe buffer, the direct write will
568 * be deferred until the receiving process grabs all of the bytes from
569 * the pipe buffer.  Then the direct mapping write is set-up.
570 */
571static int
572pipe_direct_write(wpipe, uio)
573	struct pipe *wpipe;
574	struct uio *uio;
575{
576	int error;
577	while (wpipe->pipe_state & PIPE_DIRECTW) {
578		error = tsleep(wpipe,
579				PRIBIO|PCATCH, "pipdww", 0);
580		if (error || (wpipe->pipe_state & PIPE_EOF))
581			goto error1;
582	}
583	wpipe->pipe_map.cnt = 0;	/* transfer not ready yet */
584	wpipe->pipe_state |= PIPE_DIRECTW;
585	while (wpipe->pipe_buffer.cnt > 0) {
586		error = tsleep(wpipe,
587				PRIBIO|PCATCH, "pipdwc", 0);
588		if (error || (wpipe->pipe_state & PIPE_EOF)) {
589			wpipe->pipe_state &= ~PIPE_DIRECTW;
590			if (error == 0)
591				error = EPIPE;
592			goto error1;
593		}
594	}
595
596	error = pipe_build_write_buffer(wpipe, uio);
597	if (error) {
598		wpipe->pipe_state &= ~PIPE_DIRECTW;
599		goto error1;
600	}
601
602	if (wpipe->pipe_state & PIPE_WANTR) {
603		wpipe->pipe_state &= ~PIPE_WANTR;
604		wakeup(wpipe);
605	}
606
607	error = 0;
608	while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
609		if (wpipe->pipe_state & PIPE_EOF) {
610			pipelock(wpipe, 0);
611			pipe_destroy_write_buffer(wpipe);
612			pipeunlock(wpipe);
613			wakeup(wpipe);
614			return EPIPE;
615		}
616		error = tsleep(wpipe, PRIBIO|PCATCH, "pipdwt", 0);
617	}
618
619	pipelock(wpipe,0);
620	if (wpipe->pipe_state & PIPE_DIRECTW) {
621		/*
622		 * this bit of trickery substitutes a kernel buffer for
623		 * the process that might be going away.
624		 */
625		pipe_clone_write_buffer(wpipe);
626	} else {
627		pipe_destroy_write_buffer(wpipe);
628	}
629	pipeunlock(wpipe);
630	return error;
631
632error1:
633	wakeup(wpipe);
634	return error;
635}
636
637static __inline int
638pipewrite(wpipe, uio, nbio)
639	struct pipe *wpipe;
640	struct uio *uio;
641	int nbio;
642{
643	int error = 0;
644
645	/*
646	 * detect loss of pipe read side, issue SIGPIPE if lost.
647	 */
648	if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF)) {
649		return EPIPE;
650	}
651
652	if( wpipe->pipe_buffer.buffer == NULL) {
653		if ((error = pipelock(wpipe,1)) == 0) {
654			pipespace(wpipe);
655			pipeunlock(wpipe);
656		} else {
657			return error;
658		}
659	}
660
661	++wpipe->pipe_busy;
662	while (uio->uio_resid) {
663		int space;
664		/*
665		 * If the transfer is large, we can gain performance if
666		 * we do process-to-process copies directly.
667		 */
668		if ((amountpipekva < LIMITPIPEKVA) &&
669			(uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
670			error = pipe_direct_write( wpipe, uio);
671			if (error) {
672				break;
673			}
674			continue;
675		}
676
677		/*
678		 * Pipe buffered writes cannot be coincidental with
679		 * direct writes.  We wait until the currently executing
680		 * direct write is completed before we start filling the
681		 * pipe buffer.
682		 */
683	retrywrite:
684		while (wpipe->pipe_state & PIPE_DIRECTW) {
685			error = tsleep(wpipe,
686					PRIBIO|PCATCH, "pipbww", 0);
687			if (error)
688				break;
689		}
690
691		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
692
693		/*
694		 * We must afford contiguous writes on buffers of size
695		 * PIPE_BUF or less.
696		 */
697		if ((space > 0) &&
698			((uio->uio_resid > PIPE_BUF) || (uio->uio_resid <= space))) {
699			int size = wpipe->pipe_buffer.size - wpipe->pipe_buffer.in;
700			if (size > space)
701				size = space;
702			if (size > uio->uio_resid)
703				size = uio->uio_resid;
704			if ((error = pipelock(wpipe,1)) == 0) {
705				/*
706				 * It is possible for a direct write to
707				 * slip in on us... handle it here...
708				 */
709				if (wpipe->pipe_state & PIPE_DIRECTW) {
710					pipeunlock(wpipe);
711					goto retrywrite;
712				}
713				error = uiomove( &wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
714					size, uio);
715				pipeunlock(wpipe);
716			}
717			if (error)
718				break;
719
720			wpipe->pipe_buffer.in += size;
721			if (wpipe->pipe_buffer.in >= wpipe->pipe_buffer.size)
722				wpipe->pipe_buffer.in = 0;
723
724			wpipe->pipe_buffer.cnt += size;
725			wpipe->pipe_mtime = time;
726		} else {
727			/*
728			 * If the "read-side" has been blocked, wake it up now.
729			 */
730			if (wpipe->pipe_state & PIPE_WANTR) {
731				wpipe->pipe_state &= ~PIPE_WANTR;
732				wakeup(wpipe);
733			}
734			/*
735			 * don't block on non-blocking I/O
736			 */
737			if (nbio) {
738				error = EAGAIN;
739				break;
740			}
741
742			wpipe->pipe_state |= PIPE_WANTW;
743			if (error = tsleep(wpipe, (PRIBIO+1)|PCATCH, "pipewr", 0)) {
744				break;
745			}
746			/*
747			 * If read side wants to go away, we just issue a signal
748			 * to ourselves.
749			 */
750			if (wpipe->pipe_state & PIPE_EOF) {
751				error = EPIPE;
752				break;
753			}
754		}
755	}
756
757	if ((wpipe->pipe_busy == 0) &&
758		(wpipe->pipe_state & PIPE_WANT)) {
759		wpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTR);
760		wakeup(wpipe);
761	} else if (wpipe->pipe_buffer.cnt > 0) {
762		/*
763		 * If we have put any characters in the buffer, we wake up
764		 * the reader.
765		 */
766		if (wpipe->pipe_state & PIPE_WANTR) {
767			wpipe->pipe_state &= ~PIPE_WANTR;
768			wakeup(wpipe);
769		}
770	}
771
772	/*
773	 * Don't return EPIPE if I/O was successful
774	 */
775	if ((wpipe->pipe_buffer.cnt == 0) &&
776		(uio->uio_resid == 0) &&
777		(error == EPIPE))
778		error = 0;
779
780	if (wpipe->pipe_state & PIPE_SEL) {
781		wpipe->pipe_state &= ~PIPE_SEL;
782		selwakeup(&wpipe->pipe_sel);
783	}
784
785	--wpipe->pipe_busy;
786	return error;
787}
788
789/* ARGSUSED */
790static int
791pipe_write(fp, uio, cred)
792	struct file *fp;
793	struct uio *uio;
794	struct ucred *cred;
795{
796	struct pipe *rpipe = (struct pipe *) fp->f_data;
797	struct pipe *wpipe = rpipe->pipe_peer;
798	return pipewrite(wpipe, uio, (rpipe->pipe_state & PIPE_NBIO)?1:0);
799}
800
801/*
802 * we implement a very minimal set of ioctls for compatibility with sockets.
803 */
804int
805pipe_ioctl(fp, cmd, data, p)
806	struct file *fp;
807	int cmd;
808	register caddr_t data;
809	struct proc *p;
810{
811	register struct pipe *mpipe = (struct pipe *)fp->f_data;
812
813	switch (cmd) {
814
815	case FIONBIO:
816		if (*(int *)data)
817			mpipe->pipe_state |= PIPE_NBIO;
818		else
819			mpipe->pipe_state &= ~PIPE_NBIO;
820		return (0);
821
822	case FIOASYNC:
823		if (*(int *)data) {
824			mpipe->pipe_state |= PIPE_ASYNC;
825		} else {
826			mpipe->pipe_state &= ~PIPE_ASYNC;
827		}
828		return (0);
829
830	case FIONREAD:
831		*(int *)data = mpipe->pipe_buffer.cnt;
832		return (0);
833
834	case SIOCSPGRP:
835		mpipe->pipe_pgid = *(int *)data;
836		return (0);
837
838	case SIOCGPGRP:
839		*(int *)data = mpipe->pipe_pgid;
840		return (0);
841
842	}
843	return ENOSYS;
844}
845
846int
847pipe_select(fp, which, p)
848	struct file *fp;
849	int which;
850	struct proc *p;
851{
852	register struct pipe *rpipe = (struct pipe *)fp->f_data;
853	struct pipe *wpipe;
854	register int s = splnet();
855
856	wpipe = rpipe->pipe_peer;
857	switch (which) {
858
859	case FREAD:
860		if (rpipe->pipe_buffer.cnt > 0 ||
861			(rpipe->pipe_state & PIPE_EOF)) {
862			splx(s);
863			return (1);
864		}
865		selrecord(p, &rpipe->pipe_sel);
866		rpipe->pipe_state |= PIPE_SEL;
867		break;
868
869	case FWRITE:
870		if ((wpipe == NULL) ||
871			(wpipe->pipe_state & PIPE_EOF) ||
872			((wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) {
873			splx(s);
874			return (1);
875		}
876		selrecord(p, &wpipe->pipe_sel);
877		wpipe->pipe_state |= PIPE_SEL;
878		break;
879
880	case 0:
881		if ((rpipe->pipe_state & PIPE_EOF) ||
882			(wpipe == NULL) ||
883			(wpipe->pipe_state & PIPE_EOF)) {
884			splx(s);
885			return (1);
886		}
887
888		selrecord(p, &rpipe->pipe_sel);
889		rpipe->pipe_state |= PIPE_SEL;
890		break;
891	}
892	splx(s);
893	return (0);
894}
895
896int
897pipe_stat(pipe, ub)
898	register struct pipe *pipe;
899	register struct stat *ub;
900{
901	bzero((caddr_t)ub, sizeof (*ub));
902	ub->st_mode = S_IFSOCK;
903	ub->st_blksize = pipe->pipe_buffer.size;
904	ub->st_size = pipe->pipe_buffer.cnt;
905	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
906	TIMEVAL_TO_TIMESPEC(&pipe->pipe_atime, &ub->st_atimespec);
907	TIMEVAL_TO_TIMESPEC(&pipe->pipe_mtime, &ub->st_mtimespec);
908	TIMEVAL_TO_TIMESPEC(&pipe->pipe_ctime, &ub->st_ctimespec);
909	return 0;
910}
911
912/* ARGSUSED */
913static int
914pipe_close(fp, p)
915	struct file *fp;
916	struct proc *p;
917{
918	int error = 0;
919	struct pipe *cpipe = (struct pipe *)fp->f_data;
920	pipeclose(cpipe);
921	fp->f_data = NULL;
922	return 0;
923}
924
925/*
926 * shutdown the pipe
927 */
928static void
929pipeclose(cpipe)
930	struct pipe *cpipe;
931{
932	struct pipe *ppipe;
933	if (cpipe) {
934
935		if (cpipe->pipe_state & PIPE_SEL) {
936			cpipe->pipe_state &= ~PIPE_SEL;
937			selwakeup(&cpipe->pipe_sel);
938		}
939
940		/*
941		 * If the other side is blocked, wake it up saying that
942		 * we want to close it down.
943		 */
944		while (cpipe->pipe_busy) {
945			wakeup(cpipe);
946			cpipe->pipe_state |= PIPE_WANT|PIPE_EOF;
947			tsleep(cpipe, PRIBIO, "pipecl", 0);
948		}
949
950		/*
951		 * Disconnect from peer
952		 */
953		if (ppipe = cpipe->pipe_peer) {
954			if (ppipe->pipe_state & PIPE_SEL) {
955				ppipe->pipe_state &= ~PIPE_SEL;
956				selwakeup(&ppipe->pipe_sel);
957			}
958
959			ppipe->pipe_state |= PIPE_EOF;
960			wakeup(ppipe);
961			ppipe->pipe_peer = NULL;
962		}
963
964		/*
965		 * free resources
966		 */
967		if (cpipe->pipe_buffer.buffer) {
968			amountpipekva -= cpipe->pipe_buffer.size;
969			kmem_free(kernel_map,
970				(vm_offset_t)cpipe->pipe_buffer.buffer,
971				cpipe->pipe_buffer.size);
972		}
973		if (cpipe->pipe_map.kva) {
974			amountpipekva -= cpipe->pipe_buffer.size;
975			kmem_free(kernel_map,
976				cpipe->pipe_map.kva,
977				cpipe->pipe_buffer.size);
978		}
979		free(cpipe, M_TEMP);
980	}
981}
982#endif
983