sys_pipe.c revision 27923
1/*
2 * Copyright (c) 1996 John S. Dyson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice immediately at the beginning of the file, without modification,
10 *    this list of conditions, and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 * 3. Absolutely no warranty of function or purpose is made by the author
15 *    John S. Dyson.
16 * 4. Modifications may be freely made to this file if the above conditions
17 *    are met.
18 *
19 * $Id: sys_pipe.c,v 1.30 1997/08/05 00:05:00 dyson Exp $
20 */
21
22/*
23 * This file contains a high-performance replacement for the socket-based
24 * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
25 * all features of sockets, but does do everything that pipes normally
26 * do.
27 */
28
29/*
30 * This code has two modes of operation, a small write mode and a large
31 * write mode.  The small write mode acts like conventional pipes with
32 * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
33 * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
34 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
35 * the receiving process can copy it directly from the pages in the sending
36 * process.
37 *
38 * If the sending process receives a signal, it is possible that it will
39 * go away, and certainly its address space can change, because control
40 * is returned back to the user-mode side.  In that case, the pipe code
41 * arranges to copy the buffer supplied by the user process, to a pageable
42 * kernel buffer, and the receiving process will grab the data from the
43 * pageable kernel buffer.  Since signals don't happen all that often,
44 * the copy operation is normally eliminated.
45 *
46 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
47 * happen for small transfers so that the system will not spend all of
48 * its time context switching.  PIPE_SIZE is constrained by the
49 * amount of kernel virtual memory.
50 */
51
52#include <sys/param.h>
53#include <sys/systm.h>
54#include <sys/proc.h>
55#include <sys/fcntl.h>
56#include <sys/file.h>
57#include <sys/protosw.h>
58#include <sys/stat.h>
59#include <sys/filedesc.h>
60#include <sys/malloc.h>
61#include <sys/filio.h>
62#include <sys/ttycom.h>
63#include <sys/stat.h>
64#include <sys/select.h>
65#include <sys/signalvar.h>
66#include <sys/errno.h>
67#include <sys/queue.h>
68#include <sys/vmmeter.h>
69#include <sys/kernel.h>
70#include <sys/sysproto.h>
71#include <sys/pipe.h>
72
73#include <vm/vm.h>
74#include <vm/vm_prot.h>
75#include <vm/vm_param.h>
76#include <sys/lock.h>
77#include <vm/vm_object.h>
78#include <vm/vm_kern.h>
79#include <vm/vm_extern.h>
80#include <vm/pmap.h>
81#include <vm/vm_map.h>
82#include <vm/vm_page.h>
83#include <vm/vm_zone.h>
84
85/*
86 * Use this define if you want to disable *fancy* VM things.  Expect an
87 * approx 30% decrease in transfer rate.  This could be useful for
88 * NetBSD or OpenBSD.
89 */
90/* #define PIPE_NODIRECT */
91
92/*
93 * interfaces to the outside world
94 */
95static int pipe_read __P((struct file *fp, struct uio *uio,
96		struct ucred *cred));
97static int pipe_write __P((struct file *fp, struct uio *uio,
98		struct ucred *cred));
99static int pipe_close __P((struct file *fp, struct proc *p));
100static int pipe_select __P((struct file *fp, int which, struct proc *p));
101static int pipe_ioctl __P((struct file *fp, int cmd, caddr_t data, struct proc *p));
102
103static struct fileops pipeops =
104    { pipe_read, pipe_write, pipe_ioctl, pipe_select, pipe_close };
105
106/*
107 * Default pipe buffer size(s), this can be kind-of large now because pipe
108 * space is pageable.  The pipe code will try to maintain locality of
109 * reference for performance reasons, so small amounts of outstanding I/O
110 * will not wipe the cache.
111 */
112#define MINPIPESIZE (PIPE_SIZE/3)
113#define MAXPIPESIZE (2*PIPE_SIZE/3)
114
115/*
116 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
117 * is there so that on large systems, we don't exhaust it.
118 */
119#define MAXPIPEKVA (8*1024*1024)
120
121/*
122 * Limit for direct transfers, we cannot, of course limit
123 * the amount of kva for pipes in general though.
124 */
125#define LIMITPIPEKVA (16*1024*1024)
126
127/*
128 * Limit the number of "big" pipes
129 */
130#define LIMITBIGPIPES	32
131int nbigpipe;
132
133static int amountpipekva;
134
135static void pipeclose __P((struct pipe *cpipe));
136static void pipeinit __P((struct pipe *cpipe));
137static __inline int pipelock __P((struct pipe *cpipe, int catch));
138static __inline void pipeunlock __P((struct pipe *cpipe));
139static __inline void pipeselwakeup __P((struct pipe *cpipe));
140#ifndef PIPE_NODIRECT
141static int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio));
142static void pipe_destroy_write_buffer __P((struct pipe *wpipe));
143static int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio));
144static void pipe_clone_write_buffer __P((struct pipe *wpipe));
145#endif
146static void pipespace __P((struct pipe *cpipe));
147
148vm_zone_t pipe_zone;
149
150/*
151 * The pipe system call for the DTYPE_PIPE type of pipes
152 */
153
154/* ARGSUSED */
155int
156pipe(p, uap, retval)
157	struct proc *p;
158	struct pipe_args /* {
159		int	dummy;
160	} */ *uap;
161	int retval[];
162{
163	register struct filedesc *fdp = p->p_fd;
164	struct file *rf, *wf;
165	struct pipe *rpipe, *wpipe;
166	int fd, error;
167
168	if (pipe_zone == NULL)
169		pipe_zone = zinit("PIPE", sizeof (struct pipe), 0, 0, 4);
170
171	rpipe = zalloc( pipe_zone);
172	pipeinit(rpipe);
173	rpipe->pipe_state |= PIPE_DIRECTOK;
174	wpipe = zalloc( pipe_zone);
175	pipeinit(wpipe);
176	wpipe->pipe_state |= PIPE_DIRECTOK;
177
178	error = falloc(p, &rf, &fd);
179	if (error)
180		goto free2;
181	retval[0] = fd;
182	rf->f_flag = FREAD | FWRITE;
183	rf->f_type = DTYPE_PIPE;
184	rf->f_ops = &pipeops;
185	rf->f_data = (caddr_t)rpipe;
186	error = falloc(p, &wf, &fd);
187	if (error)
188		goto free3;
189	wf->f_flag = FREAD | FWRITE;
190	wf->f_type = DTYPE_PIPE;
191	wf->f_ops = &pipeops;
192	wf->f_data = (caddr_t)wpipe;
193	retval[1] = fd;
194
195	rpipe->pipe_peer = wpipe;
196	wpipe->pipe_peer = rpipe;
197
198	return (0);
199free3:
200	ffree(rf);
201	fdp->fd_ofiles[retval[0]] = 0;
202free2:
203	(void)pipeclose(wpipe);
204	(void)pipeclose(rpipe);
205	return (error);
206}
207
208/*
209 * Allocate kva for pipe circular buffer, the space is pageable
210 */
211static void
212pipespace(cpipe)
213	struct pipe *cpipe;
214{
215	int npages, error;
216
217	npages = round_page(cpipe->pipe_buffer.size)/PAGE_SIZE;
218	/*
219	 * Create an object, I don't like the idea of paging to/from
220	 * kernel_object.
221	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
222	 */
223	cpipe->pipe_buffer.object = vm_object_allocate(OBJT_DEFAULT, npages);
224	cpipe->pipe_buffer.buffer = (caddr_t) vm_map_min(kernel_map);
225
226	/*
227	 * Insert the object into the kernel map, and allocate kva for it.
228	 * The map entry is, by default, pageable.
229	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
230	 */
231	error = vm_map_find(kernel_map, cpipe->pipe_buffer.object, 0,
232		(vm_offset_t *) &cpipe->pipe_buffer.buffer,
233		cpipe->pipe_buffer.size, 1,
234		VM_PROT_ALL, VM_PROT_ALL, 0);
235
236	if (error != KERN_SUCCESS)
237		panic("pipeinit: cannot allocate pipe -- out of kvm -- code = %d", error);
238	amountpipekva += cpipe->pipe_buffer.size;
239}
240
241/*
242 * initialize and allocate VM and memory for pipe
243 */
244static void
245pipeinit(cpipe)
246	struct pipe *cpipe;
247{
248	int s;
249
250	cpipe->pipe_buffer.in = 0;
251	cpipe->pipe_buffer.out = 0;
252	cpipe->pipe_buffer.cnt = 0;
253	cpipe->pipe_buffer.size = PIPE_SIZE;
254
255	/* Buffer kva gets dynamically allocated */
256	cpipe->pipe_buffer.buffer = NULL;
257	/* cpipe->pipe_buffer.object = invalid */
258
259	cpipe->pipe_state = 0;
260	cpipe->pipe_peer = NULL;
261	cpipe->pipe_busy = 0;
262	gettime(&cpipe->pipe_ctime);
263	cpipe->pipe_atime = cpipe->pipe_ctime;
264	cpipe->pipe_mtime = cpipe->pipe_ctime;
265	bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel);
266	cpipe->pipe_pgid = NO_PID;
267
268#ifndef PIPE_NODIRECT
269	/*
270	 * pipe data structure initializations to support direct pipe I/O
271	 */
272	cpipe->pipe_map.cnt = 0;
273	cpipe->pipe_map.kva = 0;
274	cpipe->pipe_map.pos = 0;
275	cpipe->pipe_map.npages = 0;
276	/* cpipe->pipe_map.ms[] = invalid */
277#endif
278}
279
280
281/*
282 * lock a pipe for I/O, blocking other access
283 */
284static __inline int
285pipelock(cpipe, catch)
286	struct pipe *cpipe;
287	int catch;
288{
289	int error;
290	while (cpipe->pipe_state & PIPE_LOCK) {
291		cpipe->pipe_state |= PIPE_LWANT;
292		if (error = tsleep( cpipe,
293			catch?(PRIBIO|PCATCH):PRIBIO, "pipelk", 0)) {
294			return error;
295		}
296	}
297	cpipe->pipe_state |= PIPE_LOCK;
298	return 0;
299}
300
301/*
302 * unlock a pipe I/O lock
303 */
304static __inline void
305pipeunlock(cpipe)
306	struct pipe *cpipe;
307{
308	cpipe->pipe_state &= ~PIPE_LOCK;
309	if (cpipe->pipe_state & PIPE_LWANT) {
310		cpipe->pipe_state &= ~PIPE_LWANT;
311		wakeup(cpipe);
312	}
313}
314
315static __inline void
316pipeselwakeup(cpipe)
317	struct pipe *cpipe;
318{
319	struct proc *p;
320
321	if (cpipe->pipe_state & PIPE_SEL) {
322		cpipe->pipe_state &= ~PIPE_SEL;
323		selwakeup(&cpipe->pipe_sel);
324	}
325	if (cpipe->pipe_state & PIPE_ASYNC) {
326		if (cpipe->pipe_pgid < 0)
327			gsignal(-cpipe->pipe_pgid, SIGIO);
328		else if ((p = pfind(cpipe->pipe_pgid)) != NULL)
329			psignal(p, SIGIO);
330	}
331}
332
333/* ARGSUSED */
334static int
335pipe_read(fp, uio, cred)
336	struct file *fp;
337	struct uio *uio;
338	struct ucred *cred;
339{
340
341	struct pipe *rpipe = (struct pipe *) fp->f_data;
342	int error = 0;
343	int nread = 0;
344	u_int size;
345
346	++rpipe->pipe_busy;
347	while (uio->uio_resid) {
348		/*
349		 * normal pipe buffer receive
350		 */
351		if (rpipe->pipe_buffer.cnt > 0) {
352			size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
353			if (size > rpipe->pipe_buffer.cnt)
354				size = rpipe->pipe_buffer.cnt;
355			if (size > (u_int) uio->uio_resid)
356				size = (u_int) uio->uio_resid;
357			if ((error = pipelock(rpipe,1)) == 0) {
358				error = uiomove( &rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
359					size, uio);
360				pipeunlock(rpipe);
361			}
362			if (error) {
363				break;
364			}
365			rpipe->pipe_buffer.out += size;
366			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
367				rpipe->pipe_buffer.out = 0;
368
369			rpipe->pipe_buffer.cnt -= size;
370			nread += size;
371#ifndef PIPE_NODIRECT
372		/*
373		 * Direct copy, bypassing a kernel buffer.
374		 */
375		} else if ((size = rpipe->pipe_map.cnt) &&
376			(rpipe->pipe_state & PIPE_DIRECTW)) {
377			caddr_t va;
378			if (size > (u_int) uio->uio_resid)
379				size = (u_int) uio->uio_resid;
380			if ((error = pipelock(rpipe,1)) == 0) {
381				va = (caddr_t) rpipe->pipe_map.kva + rpipe->pipe_map.pos;
382				error = uiomove(va, size, uio);
383				pipeunlock(rpipe);
384			}
385			if (error)
386				break;
387			nread += size;
388			rpipe->pipe_map.pos += size;
389			rpipe->pipe_map.cnt -= size;
390			if (rpipe->pipe_map.cnt == 0) {
391				rpipe->pipe_state &= ~PIPE_DIRECTW;
392				wakeup(rpipe);
393			}
394#endif
395		} else {
396			/*
397			 * detect EOF condition
398			 */
399			if (rpipe->pipe_state & PIPE_EOF) {
400				/* XXX error = ? */
401				break;
402			}
403			/*
404			 * If the "write-side" has been blocked, wake it up now.
405			 */
406			if (rpipe->pipe_state & PIPE_WANTW) {
407				rpipe->pipe_state &= ~PIPE_WANTW;
408				wakeup(rpipe);
409			}
410			if (nread > 0)
411				break;
412
413			if (fp->f_flag & FNONBLOCK) {
414				error = EAGAIN;
415				break;
416			}
417
418			/*
419			 * If there is no more to read in the pipe, reset
420			 * its pointers to the beginning.  This improves
421			 * cache hit stats.
422			 */
423
424			if ((error = pipelock(rpipe,1)) == 0) {
425				if (rpipe->pipe_buffer.cnt == 0) {
426					rpipe->pipe_buffer.in = 0;
427					rpipe->pipe_buffer.out = 0;
428				}
429				pipeunlock(rpipe);
430			} else {
431				break;
432			}
433
434			if (rpipe->pipe_state & PIPE_WANTW) {
435				rpipe->pipe_state &= ~PIPE_WANTW;
436				wakeup(rpipe);
437			}
438
439			rpipe->pipe_state |= PIPE_WANTR;
440			if (error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) {
441				break;
442			}
443		}
444	}
445
446	if (error == 0)
447		gettime(&rpipe->pipe_atime);
448
449	--rpipe->pipe_busy;
450	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
451		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
452		wakeup(rpipe);
453	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
454		/*
455		 * If there is no more to read in the pipe, reset
456		 * its pointers to the beginning.  This improves
457		 * cache hit stats.
458		 */
459		if (rpipe->pipe_buffer.cnt == 0) {
460			if ((error == 0) && (error = pipelock(rpipe,1)) == 0) {
461				rpipe->pipe_buffer.in = 0;
462				rpipe->pipe_buffer.out = 0;
463				pipeunlock(rpipe);
464			}
465		}
466
467		/*
468		 * If the "write-side" has been blocked, wake it up now.
469		 */
470		if (rpipe->pipe_state & PIPE_WANTW) {
471			rpipe->pipe_state &= ~PIPE_WANTW;
472			wakeup(rpipe);
473		}
474	}
475
476	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
477		pipeselwakeup(rpipe);
478
479	return error;
480}
481
482#ifndef PIPE_NODIRECT
483/*
484 * Map the sending processes' buffer into kernel space and wire it.
485 * This is similar to a physical write operation.
486 */
487static int
488pipe_build_write_buffer(wpipe, uio)
489	struct pipe *wpipe;
490	struct uio *uio;
491{
492	u_int size;
493	int i;
494	vm_offset_t addr, endaddr, paddr;
495
496	size = (u_int) uio->uio_iov->iov_len;
497	if (size > wpipe->pipe_buffer.size)
498		size = wpipe->pipe_buffer.size;
499
500	endaddr = round_page(uio->uio_iov->iov_base + size);
501	for(i = 0, addr = trunc_page(uio->uio_iov->iov_base);
502		addr < endaddr;
503		addr += PAGE_SIZE, i+=1) {
504
505		vm_page_t m;
506
507		vm_fault_quick( (caddr_t) addr, VM_PROT_READ);
508		paddr = pmap_kextract(addr);
509		if (!paddr) {
510			int j;
511			for(j=0;j<i;j++)
512				vm_page_unwire(wpipe->pipe_map.ms[j]);
513			return EFAULT;
514		}
515
516		m = PHYS_TO_VM_PAGE(paddr);
517		vm_page_wire(m);
518		wpipe->pipe_map.ms[i] = m;
519	}
520
521/*
522 * set up the control block
523 */
524	wpipe->pipe_map.npages = i;
525	wpipe->pipe_map.pos = ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
526	wpipe->pipe_map.cnt = size;
527
528/*
529 * and map the buffer
530 */
531	if (wpipe->pipe_map.kva == 0) {
532		/*
533		 * We need to allocate space for an extra page because the
534		 * address range might (will) span pages at times.
535		 */
536		wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
537			wpipe->pipe_buffer.size + PAGE_SIZE);
538		amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE;
539	}
540	pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
541		wpipe->pipe_map.npages);
542
543/*
544 * and update the uio data
545 */
546
547	uio->uio_iov->iov_len -= size;
548	uio->uio_iov->iov_base += size;
549	if (uio->uio_iov->iov_len == 0)
550		uio->uio_iov++;
551	uio->uio_resid -= size;
552	uio->uio_offset += size;
553	return 0;
554}
555
556/*
557 * unmap and unwire the process buffer
558 */
559static void
560pipe_destroy_write_buffer(wpipe)
561struct pipe *wpipe;
562{
563	int i;
564	if (wpipe->pipe_map.kva) {
565		pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
566
567		if (amountpipekva > MAXPIPEKVA) {
568			vm_offset_t kva = wpipe->pipe_map.kva;
569			wpipe->pipe_map.kva = 0;
570			kmem_free(kernel_map, kva,
571				wpipe->pipe_buffer.size + PAGE_SIZE);
572			amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
573		}
574	}
575	for (i=0;i<wpipe->pipe_map.npages;i++)
576		vm_page_unwire(wpipe->pipe_map.ms[i]);
577}
578
579/*
580 * In the case of a signal, the writing process might go away.  This
581 * code copies the data into the circular buffer so that the source
582 * pages can be freed without loss of data.
583 */
584static void
585pipe_clone_write_buffer(wpipe)
586struct pipe *wpipe;
587{
588	int size;
589	int pos;
590
591	size = wpipe->pipe_map.cnt;
592	pos = wpipe->pipe_map.pos;
593	bcopy((caddr_t) wpipe->pipe_map.kva+pos,
594			(caddr_t) wpipe->pipe_buffer.buffer,
595			size);
596
597	wpipe->pipe_buffer.in = size;
598	wpipe->pipe_buffer.out = 0;
599	wpipe->pipe_buffer.cnt = size;
600	wpipe->pipe_state &= ~PIPE_DIRECTW;
601
602	pipe_destroy_write_buffer(wpipe);
603}
604
605/*
606 * This implements the pipe buffer write mechanism.  Note that only
607 * a direct write OR a normal pipe write can be pending at any given time.
608 * If there are any characters in the pipe buffer, the direct write will
609 * be deferred until the receiving process grabs all of the bytes from
610 * the pipe buffer.  Then the direct mapping write is set-up.
611 */
612static int
613pipe_direct_write(wpipe, uio)
614	struct pipe *wpipe;
615	struct uio *uio;
616{
617	int error;
618retry:
619	while (wpipe->pipe_state & PIPE_DIRECTW) {
620		if ( wpipe->pipe_state & PIPE_WANTR) {
621			wpipe->pipe_state &= ~PIPE_WANTR;
622			wakeup(wpipe);
623		}
624		wpipe->pipe_state |= PIPE_WANTW;
625		error = tsleep(wpipe,
626				PRIBIO|PCATCH, "pipdww", 0);
627		if (error)
628			goto error1;
629		if (wpipe->pipe_state & PIPE_EOF) {
630			error = EPIPE;
631			goto error1;
632		}
633	}
634	wpipe->pipe_map.cnt = 0;	/* transfer not ready yet */
635	if (wpipe->pipe_buffer.cnt > 0) {
636		if ( wpipe->pipe_state & PIPE_WANTR) {
637			wpipe->pipe_state &= ~PIPE_WANTR;
638			wakeup(wpipe);
639		}
640
641		wpipe->pipe_state |= PIPE_WANTW;
642		error = tsleep(wpipe,
643				PRIBIO|PCATCH, "pipdwc", 0);
644		if (error)
645			goto error1;
646		if (wpipe->pipe_state & PIPE_EOF) {
647			error = EPIPE;
648			goto error1;
649		}
650		goto retry;
651	}
652
653	wpipe->pipe_state |= PIPE_DIRECTW;
654
655	error = pipe_build_write_buffer(wpipe, uio);
656	if (error) {
657		wpipe->pipe_state &= ~PIPE_DIRECTW;
658		goto error1;
659	}
660
661	error = 0;
662	while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
663		if (wpipe->pipe_state & PIPE_EOF) {
664			pipelock(wpipe, 0);
665			pipe_destroy_write_buffer(wpipe);
666			pipeunlock(wpipe);
667			pipeselwakeup(wpipe);
668			error = EPIPE;
669			goto error1;
670		}
671		if (wpipe->pipe_state & PIPE_WANTR) {
672			wpipe->pipe_state &= ~PIPE_WANTR;
673			wakeup(wpipe);
674		}
675		pipeselwakeup(wpipe);
676		error = tsleep(wpipe, PRIBIO|PCATCH, "pipdwt", 0);
677	}
678
679	pipelock(wpipe,0);
680	if (wpipe->pipe_state & PIPE_DIRECTW) {
681		/*
682		 * this bit of trickery substitutes a kernel buffer for
683		 * the process that might be going away.
684		 */
685		pipe_clone_write_buffer(wpipe);
686	} else {
687		pipe_destroy_write_buffer(wpipe);
688	}
689	pipeunlock(wpipe);
690	return error;
691
692error1:
693	wakeup(wpipe);
694	return error;
695}
696#endif
697
698static int
699pipe_write(fp, uio, cred)
700	struct file *fp;
701	struct uio *uio;
702	struct ucred *cred;
703{
704	int error = 0;
705	int orig_resid;
706
707	struct pipe *wpipe, *rpipe;
708
709	rpipe = (struct pipe *) fp->f_data;
710	wpipe = rpipe->pipe_peer;
711
712	/*
713	 * detect loss of pipe read side, issue SIGPIPE if lost.
714	 */
715	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
716		return EPIPE;
717	}
718
719	/*
720	 * If it is advantageous to resize the pipe buffer, do
721	 * so.
722	 */
723	if ((uio->uio_resid > PIPE_SIZE) &&
724		(nbigpipe < LIMITBIGPIPES) &&
725		(wpipe->pipe_state & PIPE_DIRECTW) == 0 &&
726		(wpipe->pipe_buffer.size <= PIPE_SIZE) &&
727		(wpipe->pipe_buffer.cnt == 0)) {
728
729		if (wpipe->pipe_buffer.buffer) {
730			amountpipekva -= wpipe->pipe_buffer.size;
731			kmem_free(kernel_map,
732				(vm_offset_t)wpipe->pipe_buffer.buffer,
733				wpipe->pipe_buffer.size);
734		}
735
736#ifndef PIPE_NODIRECT
737		if (wpipe->pipe_map.kva) {
738			amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
739			kmem_free(kernel_map,
740				wpipe->pipe_map.kva,
741				wpipe->pipe_buffer.size + PAGE_SIZE);
742		}
743#endif
744
745		wpipe->pipe_buffer.in = 0;
746		wpipe->pipe_buffer.out = 0;
747		wpipe->pipe_buffer.cnt = 0;
748		wpipe->pipe_buffer.size = BIG_PIPE_SIZE;
749		wpipe->pipe_buffer.buffer = NULL;
750		++nbigpipe;
751
752#ifndef PIPE_NODIRECT
753		wpipe->pipe_map.cnt = 0;
754		wpipe->pipe_map.kva = 0;
755		wpipe->pipe_map.pos = 0;
756		wpipe->pipe_map.npages = 0;
757#endif
758
759	}
760
761
762	if( wpipe->pipe_buffer.buffer == NULL) {
763		if ((error = pipelock(wpipe,1)) == 0) {
764			pipespace(wpipe);
765			pipeunlock(wpipe);
766		} else {
767			return error;
768		}
769	}
770
771	++wpipe->pipe_busy;
772	orig_resid = uio->uio_resid;
773	while (uio->uio_resid) {
774		int space;
775#ifndef PIPE_NODIRECT
776		/*
777		 * If the transfer is large, we can gain performance if
778		 * we do process-to-process copies directly.
779		 * If the write is non-blocking, we don't use the
780		 * direct write mechanism.
781		 */
782		if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) &&
783		    (fp->f_flag & FNONBLOCK) == 0 &&
784			(wpipe->pipe_map.kva || (amountpipekva < LIMITPIPEKVA)) &&
785			(uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
786			error = pipe_direct_write( wpipe, uio);
787			if (error) {
788				break;
789			}
790			continue;
791		}
792#endif
793
794		/*
795		 * Pipe buffered writes cannot be coincidental with
796		 * direct writes.  We wait until the currently executing
797		 * direct write is completed before we start filling the
798		 * pipe buffer.
799		 */
800	retrywrite:
801		while (wpipe->pipe_state & PIPE_DIRECTW) {
802			if (wpipe->pipe_state & PIPE_WANTR) {
803				wpipe->pipe_state &= ~PIPE_WANTR;
804				wakeup(wpipe);
805			}
806			error = tsleep(wpipe,
807					PRIBIO|PCATCH, "pipbww", 0);
808			if (error)
809				break;
810		}
811
812		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
813
814		/* Writes of size <= PIPE_BUF must be atomic. */
815		/* XXX perhaps they need to be contiguous to be atomic? */
816		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
817			space = 0;
818
819		if (space > 0 && (wpipe->pipe_buffer.cnt < PIPE_SIZE)) {
820			/*
821			 * This set the maximum transfer as a segment of
822			 * the buffer.
823			 */
824			int size = wpipe->pipe_buffer.size - wpipe->pipe_buffer.in;
825			/*
826			 * space is the size left in the buffer
827			 */
828			if (size > space)
829				size = space;
830			/*
831			 * now limit it to the size of the uio transfer
832			 */
833			if (size > uio->uio_resid)
834				size = uio->uio_resid;
835			if ((error = pipelock(wpipe,1)) == 0) {
836				/*
837				 * It is possible for a direct write to
838				 * slip in on us... handle it here...
839				 */
840				if (wpipe->pipe_state & PIPE_DIRECTW) {
841					pipeunlock(wpipe);
842					goto retrywrite;
843				}
844				error = uiomove( &wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
845					size, uio);
846				pipeunlock(wpipe);
847			}
848			if (error)
849				break;
850
851			wpipe->pipe_buffer.in += size;
852			if (wpipe->pipe_buffer.in >= wpipe->pipe_buffer.size)
853				wpipe->pipe_buffer.in = 0;
854
855			wpipe->pipe_buffer.cnt += size;
856		} else {
857			/*
858			 * If the "read-side" has been blocked, wake it up now.
859			 */
860			if (wpipe->pipe_state & PIPE_WANTR) {
861				wpipe->pipe_state &= ~PIPE_WANTR;
862				wakeup(wpipe);
863			}
864
865			/*
866			 * don't block on non-blocking I/O
867			 */
868			if (fp->f_flag & FNONBLOCK) {
869				error = EAGAIN;
870				break;
871			}
872
873			/*
874			 * We have no more space and have something to offer,
875			 * wake up selects.
876			 */
877			pipeselwakeup(wpipe);
878
879			wpipe->pipe_state |= PIPE_WANTW;
880			if (error = tsleep(wpipe, (PRIBIO+1)|PCATCH, "pipewr", 0)) {
881				break;
882			}
883			/*
884			 * If read side wants to go away, we just issue a signal
885			 * to ourselves.
886			 */
887			if (wpipe->pipe_state & PIPE_EOF) {
888				error = EPIPE;
889				break;
890			}
891		}
892	}
893
894	--wpipe->pipe_busy;
895	if ((wpipe->pipe_busy == 0) &&
896		(wpipe->pipe_state & PIPE_WANT)) {
897		wpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTR);
898		wakeup(wpipe);
899	} else if (wpipe->pipe_buffer.cnt > 0) {
900		/*
901		 * If we have put any characters in the buffer, we wake up
902		 * the reader.
903		 */
904		if (wpipe->pipe_state & PIPE_WANTR) {
905			wpipe->pipe_state &= ~PIPE_WANTR;
906			wakeup(wpipe);
907		}
908	}
909
910	/*
911	 * Don't return EPIPE if I/O was successful
912	 */
913	if ((wpipe->pipe_buffer.cnt == 0) &&
914		(uio->uio_resid == 0) &&
915		(error == EPIPE))
916		error = 0;
917
918	if (error == 0)
919		gettime(&wpipe->pipe_mtime);
920
921	/*
922	 * We have something to offer,
923	 * wake up select.
924	 */
925	if (wpipe->pipe_buffer.cnt)
926		pipeselwakeup(wpipe);
927
928	return error;
929}
930
931/*
932 * we implement a very minimal set of ioctls for compatibility with sockets.
933 */
934int
935pipe_ioctl(fp, cmd, data, p)
936	struct file *fp;
937	int cmd;
938	register caddr_t data;
939	struct proc *p;
940{
941	register struct pipe *mpipe = (struct pipe *)fp->f_data;
942
943	switch (cmd) {
944
945	case FIONBIO:
946		return (0);
947
948	case FIOASYNC:
949		if (*(int *)data) {
950			mpipe->pipe_state |= PIPE_ASYNC;
951		} else {
952			mpipe->pipe_state &= ~PIPE_ASYNC;
953		}
954		return (0);
955
956	case FIONREAD:
957		if (mpipe->pipe_state & PIPE_DIRECTW)
958			*(int *)data = mpipe->pipe_map.cnt;
959		else
960			*(int *)data = mpipe->pipe_buffer.cnt;
961		return (0);
962
963	case TIOCSPGRP:
964		mpipe->pipe_pgid = *(int *)data;
965		return (0);
966
967	case TIOCGPGRP:
968		*(int *)data = mpipe->pipe_pgid;
969		return (0);
970
971	}
972	return (ENOTTY);
973}
974
975int
976pipe_select(fp, which, p)
977	struct file *fp;
978	int which;
979	struct proc *p;
980{
981	register struct pipe *rpipe = (struct pipe *)fp->f_data;
982	struct pipe *wpipe;
983
984	wpipe = rpipe->pipe_peer;
985	switch (which) {
986
987	case FREAD:
988		if ( (rpipe->pipe_state & PIPE_DIRECTW) ||
989			(rpipe->pipe_buffer.cnt > 0) ||
990			(rpipe->pipe_state & PIPE_EOF)) {
991			return (1);
992		}
993		selrecord(p, &rpipe->pipe_sel);
994		rpipe->pipe_state |= PIPE_SEL;
995		break;
996
997	case FWRITE:
998		if ((wpipe == NULL) ||
999			(wpipe->pipe_state & PIPE_EOF) ||
1000			(((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
1001			 (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) {
1002			return (1);
1003		}
1004		selrecord(p, &wpipe->pipe_sel);
1005		wpipe->pipe_state |= PIPE_SEL;
1006		break;
1007
1008	case 0:
1009		if ((rpipe->pipe_state & PIPE_EOF) ||
1010			(wpipe == NULL) ||
1011			(wpipe->pipe_state & PIPE_EOF)) {
1012			return (1);
1013		}
1014
1015		selrecord(p, &rpipe->pipe_sel);
1016		rpipe->pipe_state |= PIPE_SEL;
1017		break;
1018	}
1019	return (0);
1020}
1021
1022int
1023pipe_stat(pipe, ub)
1024	register struct pipe *pipe;
1025	register struct stat *ub;
1026{
1027	bzero((caddr_t)ub, sizeof (*ub));
1028	ub->st_mode = S_IFIFO;
1029	ub->st_blksize = pipe->pipe_buffer.size;
1030	ub->st_size = pipe->pipe_buffer.cnt;
1031	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1032	TIMEVAL_TO_TIMESPEC(&pipe->pipe_atime, &ub->st_atimespec);
1033	TIMEVAL_TO_TIMESPEC(&pipe->pipe_mtime, &ub->st_mtimespec);
1034	TIMEVAL_TO_TIMESPEC(&pipe->pipe_ctime, &ub->st_ctimespec);
1035	/*
1036	 * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev,
1037	 * st_flags, st_gen.
1038	 * XXX (st_dev, st_ino) should be unique.
1039	 */
1040	return 0;
1041}
1042
1043/* ARGSUSED */
1044static int
1045pipe_close(fp, p)
1046	struct file *fp;
1047	struct proc *p;
1048{
1049	struct pipe *cpipe = (struct pipe *)fp->f_data;
1050
1051	pipeclose(cpipe);
1052	fp->f_data = NULL;
1053	return 0;
1054}
1055
1056/*
1057 * shutdown the pipe
1058 */
1059static void
1060pipeclose(cpipe)
1061	struct pipe *cpipe;
1062{
1063	struct pipe *ppipe;
1064	if (cpipe) {
1065
1066		pipeselwakeup(cpipe);
1067
1068		/*
1069		 * If the other side is blocked, wake it up saying that
1070		 * we want to close it down.
1071		 */
1072		while (cpipe->pipe_busy) {
1073			wakeup(cpipe);
1074			cpipe->pipe_state |= PIPE_WANT|PIPE_EOF;
1075			tsleep(cpipe, PRIBIO, "pipecl", 0);
1076		}
1077
1078		/*
1079		 * Disconnect from peer
1080		 */
1081		if (ppipe = cpipe->pipe_peer) {
1082			pipeselwakeup(ppipe);
1083
1084			ppipe->pipe_state |= PIPE_EOF;
1085			wakeup(ppipe);
1086			ppipe->pipe_peer = NULL;
1087		}
1088
1089		/*
1090		 * free resources
1091		 */
1092		if (cpipe->pipe_buffer.buffer) {
1093			if (cpipe->pipe_buffer.size > PIPE_SIZE)
1094				--nbigpipe;
1095			amountpipekva -= cpipe->pipe_buffer.size;
1096			kmem_free(kernel_map,
1097				(vm_offset_t)cpipe->pipe_buffer.buffer,
1098				cpipe->pipe_buffer.size);
1099		}
1100#ifndef PIPE_NODIRECT
1101		if (cpipe->pipe_map.kva) {
1102			amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE;
1103			kmem_free(kernel_map,
1104				cpipe->pipe_map.kva,
1105				cpipe->pipe_buffer.size + PAGE_SIZE);
1106		}
1107#endif
1108		zfree(pipe_zone, cpipe);
1109	}
1110}
1111