sys_pipe.c revision 16960
1219820Sjeff/*
2219820Sjeff * Copyright (c) 1996 John S. Dyson
3219820Sjeff * All rights reserved.
4219820Sjeff *
5219820Sjeff * Redistribution and use in source and binary forms, with or without
6219820Sjeff * modification, are permitted provided that the following conditions
7219820Sjeff * are met:
8219820Sjeff * 1. Redistributions of source code must retain the above copyright
9219820Sjeff *    notice immediately at the beginning of the file, without modification,
10219820Sjeff *    this list of conditions, and the following disclaimer.
11219820Sjeff * 2. Redistributions in binary form must reproduce the above copyright
12219820Sjeff *    notice, this list of conditions and the following disclaimer in the
13219820Sjeff *    documentation and/or other materials provided with the distribution.
14219820Sjeff * 3. Absolutely no warranty of function or purpose is made by the author
15219820Sjeff *    John S. Dyson.
16219820Sjeff * 4. Modifications may be freely made to this file if the above conditions
17219820Sjeff *    are met.
18219820Sjeff *
19219820Sjeff * $Id: sys_pipe.c,v 1.17 1996/06/17 05:15:01 dyson Exp $
20219820Sjeff */
21219820Sjeff
22219820Sjeff#ifndef OLD_PIPE
23219820Sjeff
24219820Sjeff/*
25219820Sjeff * This file contains a high-performance replacement for the socket-based
26219820Sjeff * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
27219820Sjeff * all features of sockets, but does do everything that pipes normally
28219820Sjeff * do.
29219820Sjeff */
30219820Sjeff
31219820Sjeff/*
32219820Sjeff * This code has two modes of operation, a small write mode and a large
33219820Sjeff * write mode.  The small write mode acts like conventional pipes with
34219820Sjeff * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
35219820Sjeff * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
36219820Sjeff * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
37219820Sjeff * the receiving process can copy it directly from the pages in the sending
38219820Sjeff * process.
39219820Sjeff *
40219820Sjeff * If the sending process receives a signal, it is possible that it will
41219820Sjeff * go away, and certainly its address space can change, because control
42219820Sjeff * is returned back to the user-mode side.  In that case, the pipe code
43219820Sjeff * arranges to copy the buffer supplied by the user process, to a pageable
44219820Sjeff * kernel buffer, and the receiving process will grab the data from the
45219820Sjeff * pageable kernel buffer.  Since signals don't happen all that often,
46219820Sjeff * the copy operation is normally eliminated.
47219820Sjeff *
48219820Sjeff * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
49219820Sjeff * happen for small transfers so that the system will not spend all of
50219820Sjeff * its time context switching.  PIPE_SIZE is constrained by the
51219820Sjeff * amount of kernel virtual memory.
52219820Sjeff */
53219820Sjeff
54219820Sjeff#include <sys/param.h>
55219820Sjeff#include <sys/systm.h>
56219820Sjeff#include <sys/proc.h>
57219820Sjeff#include <sys/file.h>
58219820Sjeff#include <sys/protosw.h>
59219820Sjeff#include <sys/stat.h>
60219820Sjeff#include <sys/filedesc.h>
61219820Sjeff#include <sys/malloc.h>
62219820Sjeff#include <sys/ioctl.h>
63219820Sjeff#include <sys/stat.h>
64219820Sjeff#include <sys/select.h>
65219820Sjeff#include <sys/signalvar.h>
66219820Sjeff#include <sys/errno.h>
67219820Sjeff#include <sys/queue.h>
68219820Sjeff#include <sys/vmmeter.h>
69219820Sjeff#include <sys/kernel.h>
70219820Sjeff#include <sys/sysproto.h>
71219820Sjeff#include <sys/pipe.h>
72219820Sjeff
73219820Sjeff#include <vm/vm.h>
74219820Sjeff#include <vm/vm_prot.h>
75219820Sjeff#include <vm/vm_param.h>
76219820Sjeff#include <vm/lock.h>
77219820Sjeff#include <vm/vm_object.h>
78219820Sjeff#include <vm/vm_kern.h>
79219820Sjeff#include <vm/vm_extern.h>
80219820Sjeff#include <vm/pmap.h>
81219820Sjeff#include <vm/vm_map.h>
82219820Sjeff#include <vm/vm_page.h>
83219820Sjeff
84219820Sjeff/*
85219820Sjeff * Use this define if you want to disable *fancy* VM things.  Expect an
86219820Sjeff * approx 30% decrease in transfer rate.  This could be useful for
87219820Sjeff * NetBSD or OpenBSD.
88219820Sjeff */
89219820Sjeff/* #define PIPE_NODIRECT */
90219820Sjeff
91219820Sjeff/*
92219820Sjeff * interfaces to the outside world
93219820Sjeff */
94219820Sjeffstatic int pipe_read __P((struct file *fp, struct uio *uio,
95219820Sjeff		struct ucred *cred));
96219820Sjeffstatic int pipe_write __P((struct file *fp, struct uio *uio,
97219820Sjeff		struct ucred *cred));
98219820Sjeffstatic int pipe_close __P((struct file *fp, struct proc *p));
99219820Sjeffstatic int pipe_select __P((struct file *fp, int which, struct proc *p));
100219820Sjeffstatic int pipe_ioctl __P((struct file *fp, int cmd, caddr_t data, struct proc *p));
101219820Sjeff
102219820Sjeffstatic struct fileops pipeops =
103219820Sjeff    { pipe_read, pipe_write, pipe_ioctl, pipe_select, pipe_close };
104219820Sjeff
105219820Sjeff/*
106219820Sjeff * Default pipe buffer size(s), this can be kind-of large now because pipe
107219820Sjeff * space is pageable.  The pipe code will try to maintain locality of
108219820Sjeff * reference for performance reasons, so small amounts of outstanding I/O
109219820Sjeff * will not wipe the cache.
110219820Sjeff */
111219820Sjeff#define MINPIPESIZE (PIPE_SIZE/3)
112219820Sjeff#define MAXPIPESIZE (2*PIPE_SIZE/3)
113219820Sjeff
114219820Sjeff/*
115219820Sjeff * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
116219820Sjeff * is there so that on large systems, we don't exhaust it.
117219820Sjeff */
118219820Sjeff#define MAXPIPEKVA (8*1024*1024)
119219820Sjeff
120219820Sjeff/*
121219820Sjeff * Limit for direct transfers, we cannot, of course limit
122219820Sjeff * the amount of kva for pipes in general though.
123219820Sjeff */
124219820Sjeff#define LIMITPIPEKVA (16*1024*1024)
125219820Sjeffint amountpipekva;
126219820Sjeff
127219820Sjeffstatic void pipeclose __P((struct pipe *cpipe));
128219820Sjeffstatic void pipeinit __P((struct pipe *cpipe));
129219820Sjeffstatic __inline int pipelock __P((struct pipe *cpipe, int catch));
130219820Sjeffstatic __inline void pipeunlock __P((struct pipe *cpipe));
131219820Sjeffstatic __inline void pipeselwakeup __P((struct pipe *cpipe));
132219820Sjeff#ifndef PIPE_NODIRECT
133219820Sjeffstatic int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio));
134219820Sjeffstatic void pipe_destroy_write_buffer __P((struct pipe *wpipe));
135219820Sjeffstatic int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio));
136219820Sjeffstatic void pipe_clone_write_buffer __P((struct pipe *wpipe));
137219820Sjeff#endif
138219820Sjeffstatic void pipespace __P((struct pipe *cpipe));
139219820Sjeff
140219820Sjeff/*
141219820Sjeff * The pipe system call for the DTYPE_PIPE type of pipes
142219820Sjeff */
143219820Sjeff
144219820Sjeff/* ARGSUSED */
145219820Sjeffint
146219820Sjeffpipe(p, uap, retval)
147219820Sjeff	struct proc *p;
148219820Sjeff	struct pipe_args /* {
149219820Sjeff		int	dummy;
150219820Sjeff	} */ *uap;
151219820Sjeff	int retval[];
152219820Sjeff{
153219820Sjeff	register struct filedesc *fdp = p->p_fd;
154219820Sjeff	struct file *rf, *wf;
155219820Sjeff	struct pipe *rpipe, *wpipe;
156219820Sjeff	int fd, error;
157219820Sjeff
158219820Sjeff	rpipe = malloc( sizeof (*rpipe), M_TEMP, M_WAITOK);
159219820Sjeff	pipeinit(rpipe);
160219820Sjeff	rpipe->pipe_state |= PIPE_DIRECTOK;
161219820Sjeff	wpipe = malloc( sizeof (*wpipe), M_TEMP, M_WAITOK);
162219820Sjeff	pipeinit(wpipe);
163219820Sjeff	wpipe->pipe_state |= PIPE_DIRECTOK;
164219820Sjeff
165219820Sjeff	error = falloc(p, &rf, &fd);
166219820Sjeff	if (error)
167219820Sjeff		goto free2;
168219820Sjeff	retval[0] = fd;
169219820Sjeff	rf->f_flag = FREAD | FWRITE;
170219820Sjeff	rf->f_type = DTYPE_PIPE;
171219820Sjeff	rf->f_ops = &pipeops;
172219820Sjeff	rf->f_data = (caddr_t)rpipe;
173219820Sjeff	error = falloc(p, &wf, &fd);
174219820Sjeff	if (error)
175219820Sjeff		goto free3;
176219820Sjeff	wf->f_flag = FREAD | FWRITE;
177219820Sjeff	wf->f_type = DTYPE_PIPE;
178219820Sjeff	wf->f_ops = &pipeops;
179219820Sjeff	wf->f_data = (caddr_t)wpipe;
180219820Sjeff	retval[1] = fd;
181219820Sjeff
182219820Sjeff	rpipe->pipe_peer = wpipe;
183219820Sjeff	wpipe->pipe_peer = rpipe;
184219820Sjeff
185219820Sjeff	return (0);
186219820Sjefffree3:
187219820Sjeff	ffree(rf);
188219820Sjeff	fdp->fd_ofiles[retval[0]] = 0;
189219820Sjefffree2:
190219820Sjeff	(void)pipeclose(wpipe);
191219820Sjeff	(void)pipeclose(rpipe);
192219820Sjeff	return (error);
193219820Sjeff}
194219820Sjeff
195219820Sjeff/*
196219820Sjeff * Allocate kva for pipe circular buffer, the space is pageable
197219820Sjeff */
198219820Sjeffstatic void
199219820Sjeffpipespace(cpipe)
200219820Sjeff	struct pipe *cpipe;
201219820Sjeff{
202219820Sjeff	int npages, error;
203219820Sjeff
204219820Sjeff	npages = round_page(cpipe->pipe_buffer.size)/PAGE_SIZE;
205219820Sjeff	/*
206219820Sjeff	 * Create an object, I don't like the idea of paging to/from
207219820Sjeff	 * kernel_object.
208219820Sjeff	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
209219820Sjeff	 */
210219820Sjeff	cpipe->pipe_buffer.object = vm_object_allocate(OBJT_DEFAULT, npages);
211219820Sjeff	cpipe->pipe_buffer.buffer = (caddr_t) vm_map_min(kernel_map);
212219820Sjeff
213219820Sjeff	/*
214219820Sjeff	 * Insert the object into the kernel map, and allocate kva for it.
215219820Sjeff	 * The map entry is, by default, pageable.
216219820Sjeff	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
217219820Sjeff	 */
218219820Sjeff	error = vm_map_find(kernel_map, cpipe->pipe_buffer.object, 0,
219219820Sjeff		(vm_offset_t *) &cpipe->pipe_buffer.buffer,
220219820Sjeff		cpipe->pipe_buffer.size, 1,
221219820Sjeff		VM_PROT_ALL, VM_PROT_ALL, 0);
222219820Sjeff
223219820Sjeff	if (error != KERN_SUCCESS)
224219820Sjeff		panic("pipeinit: cannot allocate pipe -- out of kvm -- code = %d", error);
225219820Sjeff	amountpipekva += cpipe->pipe_buffer.size;
226219820Sjeff}
227219820Sjeff
228219820Sjeff/*
229219820Sjeff * initialize and allocate VM and memory for pipe
230219820Sjeff */
231219820Sjeffstatic void
232219820Sjeffpipeinit(cpipe)
233219820Sjeff	struct pipe *cpipe;
234219820Sjeff{
235219820Sjeff	int s;
236219820Sjeff
237219820Sjeff	cpipe->pipe_buffer.in = 0;
238219820Sjeff	cpipe->pipe_buffer.out = 0;
239219820Sjeff	cpipe->pipe_buffer.cnt = 0;
240219820Sjeff	cpipe->pipe_buffer.size = PIPE_SIZE;
241219820Sjeff	/* Buffer kva gets dynamically allocated */
242219820Sjeff	cpipe->pipe_buffer.buffer = NULL;
243219820Sjeff
244219820Sjeff	cpipe->pipe_state = 0;
245219820Sjeff	cpipe->pipe_peer = NULL;
246219820Sjeff	cpipe->pipe_busy = 0;
247219820Sjeff	s = splhigh();
248219820Sjeff	cpipe->pipe_ctime = time;
249219820Sjeff	cpipe->pipe_atime = time;
250263102Sglebius	cpipe->pipe_mtime = time;
251219820Sjeff	splx(s);
252263102Sglebius	bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel);
253219820Sjeff
254263102Sglebius#ifndef PIPE_NODIRECT
255219820Sjeff	/*
256263102Sglebius	 * pipe data structure initializations to support direct pipe I/O
257219820Sjeff	 */
258219820Sjeff	cpipe->pipe_map.cnt = 0;
259219820Sjeff	cpipe->pipe_map.kva = 0;
260219820Sjeff	cpipe->pipe_map.pos = 0;
261219820Sjeff	cpipe->pipe_map.npages = 0;
262219820Sjeff#endif
263219820Sjeff}
264219820Sjeff
265219820Sjeff
266219820Sjeff/*
267219820Sjeff * lock a pipe for I/O, blocking other access
268219820Sjeff */
269219820Sjeffstatic __inline int
270219820Sjeffpipelock(cpipe, catch)
271219820Sjeff	struct pipe *cpipe;
272219820Sjeff	int catch;
273219820Sjeff{
274219820Sjeff	int error;
275219820Sjeff	while (cpipe->pipe_state & PIPE_LOCK) {
276219820Sjeff		cpipe->pipe_state |= PIPE_LWANT;
277219820Sjeff		if (error = tsleep( cpipe,
278219820Sjeff			catch?(PRIBIO|PCATCH):PRIBIO, "pipelk", 0)) {
279219820Sjeff			return error;
280219820Sjeff		}
281219820Sjeff	}
282219820Sjeff	cpipe->pipe_state |= PIPE_LOCK;
283219820Sjeff	return 0;
284219820Sjeff}
285219820Sjeff
286219820Sjeff/*
287219820Sjeff * unlock a pipe I/O lock
288219820Sjeff */
289219820Sjeffstatic __inline void
290219820Sjeffpipeunlock(cpipe)
291219820Sjeff	struct pipe *cpipe;
292219820Sjeff{
293219820Sjeff	cpipe->pipe_state &= ~PIPE_LOCK;
294219820Sjeff	if (cpipe->pipe_state & PIPE_LWANT) {
295219820Sjeff		cpipe->pipe_state &= ~PIPE_LWANT;
296219820Sjeff		wakeup(cpipe);
297219820Sjeff	}
298219820Sjeff	return;
299219820Sjeff}
300219820Sjeff
301219820Sjeffstatic __inline void
302219820Sjeffpipeselwakeup(cpipe)
303219820Sjeff	struct pipe *cpipe;
304219820Sjeff{
305219820Sjeff	if (cpipe->pipe_state & PIPE_SEL) {
306219820Sjeff		cpipe->pipe_state &= ~PIPE_SEL;
307219820Sjeff		selwakeup(&cpipe->pipe_sel);
308219820Sjeff	}
309219820Sjeff}
310219820Sjeff
311219820Sjeff#ifndef PIPE_NODIRECT
312219820Sjeff#if 0
313static void
314pipe_mark_pages_clean(cpipe)
315	struct pipe *cpipe;
316{
317	vm_size_t off;
318	vm_page_t m;
319
320	for(off = 0; off < cpipe->pipe_buffer.object->size; off += 1) {
321		m = vm_page_lookup(cpipe->pipe_buffer.object, off);
322		if ((m != NULL) && (m->busy == 0) && (m->flags & PG_BUSY) == 0) {
323			m->dirty = 0;
324			pmap_clear_modify(VM_PAGE_TO_PHYS(m));
325		}
326	}
327}
328#endif
329#endif
330
331/* ARGSUSED */
332static int
333pipe_read(fp, uio, cred)
334	struct file *fp;
335	struct uio *uio;
336	struct ucred *cred;
337{
338
339	struct pipe *rpipe = (struct pipe *) fp->f_data;
340	int error = 0;
341	int nread = 0;
342	int size;
343
344	++rpipe->pipe_busy;
345	while (uio->uio_resid) {
346		/*
347		 * normal pipe buffer receive
348		 */
349		if (rpipe->pipe_buffer.cnt > 0) {
350			int size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
351			if (size > rpipe->pipe_buffer.cnt)
352				size = rpipe->pipe_buffer.cnt;
353			if (size > uio->uio_resid)
354				size = uio->uio_resid;
355			if ((error = pipelock(rpipe,1)) == 0) {
356				error = uiomove( &rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
357					size, uio);
358				pipeunlock(rpipe);
359			}
360			if (error) {
361				break;
362			}
363			rpipe->pipe_buffer.out += size;
364			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
365				rpipe->pipe_buffer.out = 0;
366
367			rpipe->pipe_buffer.cnt -= size;
368			nread += size;
369#ifndef PIPE_NODIRECT
370		/*
371		 * Direct copy, bypassing a kernel buffer.
372		 */
373		} else if ((size = rpipe->pipe_map.cnt) &&
374			(rpipe->pipe_state & PIPE_DIRECTW)) {
375			caddr_t va;
376			if (size > uio->uio_resid)
377				size = uio->uio_resid;
378			if ((error = pipelock(rpipe,1)) == 0) {
379				va = (caddr_t) rpipe->pipe_map.kva + rpipe->pipe_map.pos;
380				error = uiomove(va, size, uio);
381				pipeunlock(rpipe);
382			}
383			if (error)
384				break;
385			nread += size;
386			rpipe->pipe_map.pos += size;
387			rpipe->pipe_map.cnt -= size;
388			if (rpipe->pipe_map.cnt == 0) {
389				rpipe->pipe_state &= ~PIPE_DIRECTW;
390				wakeup(rpipe);
391			}
392#endif
393		} else {
394			/*
395			 * detect EOF condition
396			 */
397			if (rpipe->pipe_state & PIPE_EOF) {
398				/* XXX error = ? */
399				break;
400			}
401			/*
402			 * If the "write-side" has been blocked, wake it up now.
403			 */
404			if (rpipe->pipe_state & PIPE_WANTW) {
405				rpipe->pipe_state &= ~PIPE_WANTW;
406				wakeup(rpipe);
407			}
408			if (nread > 0)
409				break;
410
411			if (fp->f_flag & FNONBLOCK) {
412				error = EAGAIN;
413				break;
414			}
415
416			/*
417			 * If there is no more to read in the pipe, reset
418			 * its pointers to the beginning.  This improves
419			 * cache hit stats.
420			 */
421
422			if ((error = pipelock(rpipe,1)) == 0) {
423				if (rpipe->pipe_buffer.cnt == 0) {
424					rpipe->pipe_buffer.in = 0;
425					rpipe->pipe_buffer.out = 0;
426				}
427				pipeunlock(rpipe);
428			} else {
429				break;
430			}
431
432			if (rpipe->pipe_state & PIPE_WANTW) {
433				rpipe->pipe_state &= ~PIPE_WANTW;
434				wakeup(rpipe);
435			}
436
437			rpipe->pipe_state |= PIPE_WANTR;
438			if (error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) {
439				break;
440			}
441		}
442	}
443
444	if (error == 0) {
445		int s = splhigh();
446		rpipe->pipe_atime = time;
447		splx(s);
448	}
449
450	--rpipe->pipe_busy;
451	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
452		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
453		wakeup(rpipe);
454	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
455		/*
456		 * If there is no more to read in the pipe, reset
457		 * its pointers to the beginning.  This improves
458		 * cache hit stats.
459		 */
460		if ((error == 0) && (error = pipelock(rpipe,1)) == 0) {
461			if (rpipe->pipe_buffer.cnt == 0) {
462#if 0
463				pipe_mark_pages_clean(rpipe);
464#endif
465				rpipe->pipe_buffer.in = 0;
466				rpipe->pipe_buffer.out = 0;
467			}
468			pipeunlock(rpipe);
469		}
470
471		/*
472		 * If the "write-side" has been blocked, wake it up now.
473		 */
474		if (rpipe->pipe_state & PIPE_WANTW) {
475			rpipe->pipe_state &= ~PIPE_WANTW;
476			wakeup(rpipe);
477		}
478	}
479
480	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
481		pipeselwakeup(rpipe);
482
483	return error;
484}
485
486#ifndef PIPE_NODIRECT
487/*
488 * Map the sending processes' buffer into kernel space and wire it.
489 * This is similar to a physical write operation.
490 */
491static int
492pipe_build_write_buffer(wpipe, uio)
493	struct pipe *wpipe;
494	struct uio *uio;
495{
496	int size;
497	int i;
498	vm_offset_t addr, endaddr, paddr;
499
500	size = uio->uio_iov->iov_len;
501	if (size > wpipe->pipe_buffer.size)
502		size = wpipe->pipe_buffer.size;
503
504	endaddr = round_page(uio->uio_iov->iov_base + size);
505	for(i = 0, addr = trunc_page(uio->uio_iov->iov_base);
506		addr < endaddr;
507		addr += PAGE_SIZE, i+=1) {
508
509		vm_page_t m;
510
511		vm_fault_quick( (caddr_t) addr, VM_PROT_READ);
512		paddr = pmap_kextract(addr);
513		if (!paddr) {
514			int j;
515			for(j=0;j<i;j++)
516				vm_page_unwire(wpipe->pipe_map.ms[j]);
517			return EFAULT;
518		}
519
520		m = PHYS_TO_VM_PAGE(paddr);
521		vm_page_wire(m);
522		wpipe->pipe_map.ms[i] = m;
523	}
524
525/*
526 * set up the control block
527 */
528	wpipe->pipe_map.npages = i;
529	wpipe->pipe_map.pos = ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
530	wpipe->pipe_map.cnt = size;
531
532/*
533 * and map the buffer
534 */
535	if (wpipe->pipe_map.kva == 0) {
536		/*
537		 * We need to allocate space for an extra page because the
538		 * address range might (will) span pages at times.
539		 */
540		wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
541			wpipe->pipe_buffer.size + PAGE_SIZE);
542		amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE;
543	}
544	pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
545		wpipe->pipe_map.npages);
546
547/*
548 * and update the uio data
549 */
550
551	uio->uio_iov->iov_len -= size;
552	uio->uio_iov->iov_base += size;
553	if (uio->uio_iov->iov_len == 0)
554		uio->uio_iov++;
555	uio->uio_resid -= size;
556	uio->uio_offset += size;
557	return 0;
558}
559
560/*
561 * unmap and unwire the process buffer
562 */
563static void
564pipe_destroy_write_buffer(wpipe)
565struct pipe *wpipe;
566{
567	int i;
568	pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
569
570	if (wpipe->pipe_map.kva) {
571		if (amountpipekva > MAXPIPEKVA) {
572			vm_offset_t kva = wpipe->pipe_map.kva;
573			wpipe->pipe_map.kva = 0;
574			kmem_free(kernel_map, kva,
575				wpipe->pipe_buffer.size + PAGE_SIZE);
576			amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
577		}
578	}
579	for (i=0;i<wpipe->pipe_map.npages;i++)
580		vm_page_unwire(wpipe->pipe_map.ms[i]);
581}
582
583/*
584 * In the case of a signal, the writing process might go away.  This
585 * code copies the data into the circular buffer so that the source
586 * pages can be freed without loss of data.
587 */
588static void
589pipe_clone_write_buffer(wpipe)
590struct pipe *wpipe;
591{
592	int size;
593	int pos;
594
595	size = wpipe->pipe_map.cnt;
596	pos = wpipe->pipe_map.pos;
597	bcopy((caddr_t) wpipe->pipe_map.kva+pos,
598			(caddr_t) wpipe->pipe_buffer.buffer,
599			size);
600
601	wpipe->pipe_buffer.in = size;
602	wpipe->pipe_buffer.out = 0;
603	wpipe->pipe_buffer.cnt = size;
604	wpipe->pipe_state &= ~PIPE_DIRECTW;
605
606	pipe_destroy_write_buffer(wpipe);
607}
608
609/*
610 * This implements the pipe buffer write mechanism.  Note that only
611 * a direct write OR a normal pipe write can be pending at any given time.
612 * If there are any characters in the pipe buffer, the direct write will
613 * be deferred until the receiving process grabs all of the bytes from
614 * the pipe buffer.  Then the direct mapping write is set-up.
615 */
616static int
617pipe_direct_write(wpipe, uio)
618	struct pipe *wpipe;
619	struct uio *uio;
620{
621	int error;
622retry:
623	while (wpipe->pipe_state & PIPE_DIRECTW) {
624		if ( wpipe->pipe_state & PIPE_WANTR) {
625			wpipe->pipe_state &= ~PIPE_WANTR;
626			wakeup(wpipe);
627		}
628		wpipe->pipe_state |= PIPE_WANTW;
629		error = tsleep(wpipe,
630				PRIBIO|PCATCH, "pipdww", 0);
631		if (error)
632			goto error1;
633		if (wpipe->pipe_state & PIPE_EOF) {
634			error = EPIPE;
635			goto error1;
636		}
637	}
638	wpipe->pipe_map.cnt = 0;	/* transfer not ready yet */
639	if (wpipe->pipe_buffer.cnt > 0) {
640		if ( wpipe->pipe_state & PIPE_WANTR) {
641			wpipe->pipe_state &= ~PIPE_WANTR;
642			wakeup(wpipe);
643		}
644
645		wpipe->pipe_state |= PIPE_WANTW;
646		error = tsleep(wpipe,
647				PRIBIO|PCATCH, "pipdwc", 0);
648		if (error)
649			goto error1;
650		if (wpipe->pipe_state & PIPE_EOF) {
651			error = EPIPE;
652			goto error1;
653		}
654		goto retry;
655	}
656
657	wpipe->pipe_state |= PIPE_DIRECTW;
658
659	error = pipe_build_write_buffer(wpipe, uio);
660	if (error) {
661		wpipe->pipe_state &= ~PIPE_DIRECTW;
662		goto error1;
663	}
664
665	error = 0;
666	while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
667		if (wpipe->pipe_state & PIPE_EOF) {
668			pipelock(wpipe, 0);
669			pipe_destroy_write_buffer(wpipe);
670			pipeunlock(wpipe);
671			pipeselwakeup(wpipe);
672			error = EPIPE;
673			goto error1;
674		}
675		if (wpipe->pipe_state & PIPE_WANTR) {
676			wpipe->pipe_state &= ~PIPE_WANTR;
677			wakeup(wpipe);
678		}
679		pipeselwakeup(wpipe);
680		error = tsleep(wpipe, PRIBIO|PCATCH, "pipdwt", 0);
681	}
682
683	pipelock(wpipe,0);
684	if (wpipe->pipe_state & PIPE_DIRECTW) {
685		/*
686		 * this bit of trickery substitutes a kernel buffer for
687		 * the process that might be going away.
688		 */
689		pipe_clone_write_buffer(wpipe);
690	} else {
691		pipe_destroy_write_buffer(wpipe);
692	}
693	pipeunlock(wpipe);
694	return error;
695
696error1:
697	wakeup(wpipe);
698	return error;
699}
700#endif
701
702static int
703pipe_write(fp, uio, cred)
704	struct file *fp;
705	struct uio *uio;
706	struct ucred *cred;
707{
708	int error = 0;
709	int orig_resid;
710
711	struct pipe *wpipe, *rpipe;
712
713	rpipe = (struct pipe *) fp->f_data;
714	wpipe = rpipe->pipe_peer;
715
716	/*
717	 * detect loss of pipe read side, issue SIGPIPE if lost.
718	 */
719	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
720		return EPIPE;
721	}
722
723	if( wpipe->pipe_buffer.buffer == NULL) {
724		if ((error = pipelock(wpipe,1)) == 0) {
725			pipespace(wpipe);
726			pipeunlock(wpipe);
727		} else {
728			return error;
729		}
730	}
731
732	++wpipe->pipe_busy;
733	orig_resid = uio->uio_resid;
734	while (uio->uio_resid) {
735		int space;
736#ifndef PIPE_NODIRECT
737		/*
738		 * If the transfer is large, we can gain performance if
739		 * we do process-to-process copies directly.
740		 * If the write is non-blocking, we don't use the
741		 * direct write mechanism.
742		 */
743		if ((fp->f_flag & FNONBLOCK) == 0 &&
744			(amountpipekva < LIMITPIPEKVA) &&
745			(uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
746			error = pipe_direct_write( wpipe, uio);
747			if (error) {
748				break;
749			}
750			continue;
751		}
752#endif
753
754		/*
755		 * Pipe buffered writes cannot be coincidental with
756		 * direct writes.  We wait until the currently executing
757		 * direct write is completed before we start filling the
758		 * pipe buffer.
759		 */
760	retrywrite:
761		while (wpipe->pipe_state & PIPE_DIRECTW) {
762			if (wpipe->pipe_state & PIPE_WANTR) {
763				wpipe->pipe_state &= ~PIPE_WANTR;
764				wakeup(wpipe);
765			}
766			error = tsleep(wpipe,
767					PRIBIO|PCATCH, "pipbww", 0);
768			if (error)
769				break;
770		}
771
772		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
773
774		/* Writes of size <= PIPE_BUF must be atomic. */
775		/* XXX perhaps they need to be contiguous to be atomic? */
776		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
777			space = 0;
778
779		if (space > 0) {
780			int size = wpipe->pipe_buffer.size - wpipe->pipe_buffer.in;
781			if (size > space)
782				size = space;
783			if (size > uio->uio_resid)
784				size = uio->uio_resid;
785			if ((error = pipelock(wpipe,1)) == 0) {
786				/*
787				 * It is possible for a direct write to
788				 * slip in on us... handle it here...
789				 */
790				if (wpipe->pipe_state & PIPE_DIRECTW) {
791					pipeunlock(wpipe);
792					goto retrywrite;
793				}
794				error = uiomove( &wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
795					size, uio);
796				pipeunlock(wpipe);
797			}
798			if (error)
799				break;
800
801			wpipe->pipe_buffer.in += size;
802			if (wpipe->pipe_buffer.in >= wpipe->pipe_buffer.size)
803				wpipe->pipe_buffer.in = 0;
804
805			wpipe->pipe_buffer.cnt += size;
806		} else {
807			/*
808			 * If the "read-side" has been blocked, wake it up now.
809			 */
810			if (wpipe->pipe_state & PIPE_WANTR) {
811				wpipe->pipe_state &= ~PIPE_WANTR;
812				wakeup(wpipe);
813			}
814
815			/*
816			 * don't block on non-blocking I/O
817			 */
818			if (fp->f_flag & FNONBLOCK) {
819				error = EAGAIN;
820				break;
821			}
822
823			/*
824			 * We have no more space and have something to offer,
825			 * wake up selects.
826			 */
827			pipeselwakeup(wpipe);
828
829			wpipe->pipe_state |= PIPE_WANTW;
830			if (error = tsleep(wpipe, (PRIBIO+1)|PCATCH, "pipewr", 0)) {
831				break;
832			}
833			/*
834			 * If read side wants to go away, we just issue a signal
835			 * to ourselves.
836			 */
837			if (wpipe->pipe_state & PIPE_EOF) {
838				error = EPIPE;
839				break;
840			}
841		}
842	}
843
844	--wpipe->pipe_busy;
845	if ((wpipe->pipe_busy == 0) &&
846		(wpipe->pipe_state & PIPE_WANT)) {
847		wpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTR);
848		wakeup(wpipe);
849	} else if (wpipe->pipe_buffer.cnt > 0) {
850		/*
851		 * If we have put any characters in the buffer, we wake up
852		 * the reader.
853		 */
854		if (wpipe->pipe_state & PIPE_WANTR) {
855			wpipe->pipe_state &= ~PIPE_WANTR;
856			wakeup(wpipe);
857		}
858	}
859
860	/*
861	 * Don't return EPIPE if I/O was successful
862	 */
863	if ((wpipe->pipe_buffer.cnt == 0) &&
864		(uio->uio_resid == 0) &&
865		(error == EPIPE))
866		error = 0;
867
868	if (error == 0) {
869		int s = splhigh();
870		wpipe->pipe_mtime = time;
871		splx(s);
872	}
873	/*
874	 * We have something to offer,
875	 * wake up select.
876	 */
877	if (wpipe->pipe_buffer.cnt)
878		pipeselwakeup(wpipe);
879
880	return error;
881}
882
883/*
884 * we implement a very minimal set of ioctls for compatibility with sockets.
885 */
886int
887pipe_ioctl(fp, cmd, data, p)
888	struct file *fp;
889	int cmd;
890	register caddr_t data;
891	struct proc *p;
892{
893	register struct pipe *mpipe = (struct pipe *)fp->f_data;
894
895	switch (cmd) {
896
897	case FIONBIO:
898		return (0);
899
900	case FIOASYNC:
901		if (*(int *)data) {
902			mpipe->pipe_state |= PIPE_ASYNC;
903		} else {
904			mpipe->pipe_state &= ~PIPE_ASYNC;
905		}
906		return (0);
907
908	case FIONREAD:
909		if (mpipe->pipe_state & PIPE_DIRECTW)
910			*(int *)data = mpipe->pipe_map.cnt;
911		else
912			*(int *)data = mpipe->pipe_buffer.cnt;
913		return (0);
914
915	case SIOCSPGRP:
916		mpipe->pipe_pgid = *(int *)data;
917		return (0);
918
919	case SIOCGPGRP:
920		*(int *)data = mpipe->pipe_pgid;
921		return (0);
922
923	}
924	return ENOSYS;
925}
926
927int
928pipe_select(fp, which, p)
929	struct file *fp;
930	int which;
931	struct proc *p;
932{
933	register struct pipe *rpipe = (struct pipe *)fp->f_data;
934	struct pipe *wpipe;
935
936	wpipe = rpipe->pipe_peer;
937	switch (which) {
938
939	case FREAD:
940		if ( (rpipe->pipe_state & PIPE_DIRECTW) ||
941			(rpipe->pipe_buffer.cnt > 0) ||
942			(rpipe->pipe_state & PIPE_EOF)) {
943			return (1);
944		}
945		selrecord(p, &rpipe->pipe_sel);
946		rpipe->pipe_state |= PIPE_SEL;
947		break;
948
949	case FWRITE:
950		if ((wpipe == NULL) ||
951			(wpipe->pipe_state & PIPE_EOF) ||
952			(((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
953			 (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) {
954			return (1);
955		}
956		selrecord(p, &wpipe->pipe_sel);
957		wpipe->pipe_state |= PIPE_SEL;
958		break;
959
960	case 0:
961		if ((rpipe->pipe_state & PIPE_EOF) ||
962			(wpipe == NULL) ||
963			(wpipe->pipe_state & PIPE_EOF)) {
964			return (1);
965		}
966
967		selrecord(p, &rpipe->pipe_sel);
968		rpipe->pipe_state |= PIPE_SEL;
969		break;
970	}
971	return (0);
972}
973
974int
975pipe_stat(pipe, ub)
976	register struct pipe *pipe;
977	register struct stat *ub;
978{
979	bzero((caddr_t)ub, sizeof (*ub));
980	ub->st_mode = S_IFSOCK;
981	ub->st_blksize = pipe->pipe_buffer.size;
982	ub->st_size = pipe->pipe_buffer.cnt;
983	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
984	TIMEVAL_TO_TIMESPEC(&pipe->pipe_atime, &ub->st_atimespec);
985	TIMEVAL_TO_TIMESPEC(&pipe->pipe_mtime, &ub->st_mtimespec);
986	TIMEVAL_TO_TIMESPEC(&pipe->pipe_ctime, &ub->st_ctimespec);
987	return 0;
988}
989
990/* ARGSUSED */
991static int
992pipe_close(fp, p)
993	struct file *fp;
994	struct proc *p;
995{
996	struct pipe *cpipe = (struct pipe *)fp->f_data;
997
998	pipeclose(cpipe);
999	fp->f_data = NULL;
1000	return 0;
1001}
1002
1003/*
1004 * shutdown the pipe
1005 */
1006static void
1007pipeclose(cpipe)
1008	struct pipe *cpipe;
1009{
1010	struct pipe *ppipe;
1011	if (cpipe) {
1012
1013		pipeselwakeup(cpipe);
1014
1015		/*
1016		 * If the other side is blocked, wake it up saying that
1017		 * we want to close it down.
1018		 */
1019		while (cpipe->pipe_busy) {
1020			wakeup(cpipe);
1021			cpipe->pipe_state |= PIPE_WANT|PIPE_EOF;
1022			tsleep(cpipe, PRIBIO, "pipecl", 0);
1023		}
1024
1025		/*
1026		 * Disconnect from peer
1027		 */
1028		if (ppipe = cpipe->pipe_peer) {
1029			pipeselwakeup(ppipe);
1030
1031			ppipe->pipe_state |= PIPE_EOF;
1032			wakeup(ppipe);
1033			ppipe->pipe_peer = NULL;
1034		}
1035
1036		/*
1037		 * free resources
1038		 */
1039		if (cpipe->pipe_buffer.buffer) {
1040			amountpipekva -= cpipe->pipe_buffer.size;
1041			kmem_free(kernel_map,
1042				(vm_offset_t)cpipe->pipe_buffer.buffer,
1043				cpipe->pipe_buffer.size);
1044		}
1045#ifndef PIPE_NODIRECT
1046		if (cpipe->pipe_map.kva) {
1047			amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE;
1048			kmem_free(kernel_map,
1049				cpipe->pipe_map.kva,
1050				cpipe->pipe_buffer.size + PAGE_SIZE);
1051		}
1052#endif
1053		free(cpipe, M_TEMP);
1054	}
1055}
1056#endif
1057