sys_pipe.c revision 13774
1/*
2 * Copyright (c) 1996 John S. Dyson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice immediately at the beginning of the file, without modification,
10 *    this list of conditions, and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 * 3. Absolutely no warranty of function or purpose is made by the author
15 *    John S. Dyson.
16 * 4. This work was done expressly for inclusion into FreeBSD.  Other use
17 *    is allowed if this notation is included.
18 * 5. Modifications may be freely made to this file if the above conditions
19 *    are met.
20 *
21 * $Id: sys_pipe.c,v 1.2 1996/01/29 02:57:33 dyson Exp $
22 */
23
24#ifndef OLD_PIPE
25
26/*
27 * This file contains a high-performance replacement for the socket-based
28 * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
29 * all features of sockets, but does do everything that pipes normally
30 * do.
31 */
32
33#include <sys/param.h>
34#include <sys/systm.h>
35#include <sys/proc.h>
36#include <sys/file.h>
37#include <sys/protosw.h>
38#include <sys/stat.h>
39#include <sys/filedesc.h>
40#include <sys/malloc.h>
41#include <sys/ioctl.h>
42#include <sys/stat.h>
43#include <sys/select.h>
44#include <sys/signalvar.h>
45#include <sys/errno.h>
46#include <sys/queue.h>
47#include <sys/vmmeter.h>
48#include <sys/kernel.h>
49#include <sys/sysproto.h>
50#include <sys/pipe.h>
51
52#include <vm/vm.h>
53#include <vm/vm_prot.h>
54#include <vm/vm_param.h>
55#include <vm/lock.h>
56#include <vm/vm_object.h>
57#include <vm/vm_kern.h>
58#include <vm/vm_extern.h>
59#include <vm/pmap.h>
60#include <vm/vm_map.h>
61
62static int pipe_read __P((struct file *fp, struct uio *uio,
63		struct ucred *cred));
64static int pipe_write __P((struct file *fp, struct uio *uio,
65		struct ucred *cred));
66static int pipe_close __P((struct file *fp, struct proc *p));
67static int pipe_select __P((struct file *fp, int which, struct proc *p));
68static int pipe_ioctl __P((struct file *fp, int cmd, caddr_t data, struct proc *p));
69
70static struct fileops pipeops =
71    { pipe_read, pipe_write, pipe_ioctl, pipe_select, pipe_close };
72
73/*
74 * Default pipe buffer size(s), this can be kind-of large now because pipe
75 * space is pageable.  The pipe code will try to maintain locality of
76 * reference for performance reasons, so small amounts of outstanding I/O
77 * will not wipe the cache.
78 */
79#define PIPESIZE (16384)
80#define MINPIPESIZE (PIPESIZE/3)
81#define MAXPIPESIZE (2*PIPESIZE/3)
82
83static void pipeclose __P((struct pipe *cpipe));
84static void pipebufferinit __P((struct pipe *cpipe));
85static void pipeinit __P((struct pipe *cpipe));
86static __inline int pipelock __P((struct pipe *cpipe));
87static __inline void pipeunlock __P((struct pipe *cpipe));
88
89/*
90 * The pipe system call for the DTYPE_PIPE type of pipes
91 */
92
93/* ARGSUSED */
94int
95pipe(p, uap, retval)
96	struct proc *p;
97	struct pipe_args /* {
98		int	dummy;
99	} */ *uap;
100	int retval[];
101{
102	register struct filedesc *fdp = p->p_fd;
103	struct file *rf, *wf;
104	struct pipe *rpipe, *wpipe;
105	int fd, error;
106
107	rpipe = malloc( sizeof (*rpipe), M_TEMP, M_WAITOK);
108	pipeinit(rpipe);
109	wpipe = malloc( sizeof (*wpipe), M_TEMP, M_WAITOK);
110	pipeinit(wpipe);
111
112	error = falloc(p, &rf, &fd);
113	if (error)
114		goto free2;
115	retval[0] = fd;
116	rf->f_flag = FREAD | FWRITE;
117	rf->f_type = DTYPE_PIPE;
118	rf->f_ops = &pipeops;
119	rf->f_data = (caddr_t)rpipe;
120	error = falloc(p, &wf, &fd);
121	if (error)
122		goto free3;
123	wf->f_flag = FREAD | FWRITE;
124	wf->f_type = DTYPE_PIPE;
125	wf->f_ops = &pipeops;
126	wf->f_data = (caddr_t)wpipe;
127	retval[1] = fd;
128
129	rpipe->pipe_peer = wpipe;
130	wpipe->pipe_peer = rpipe;
131
132	return (0);
133free3:
134	ffree(rf);
135	fdp->fd_ofiles[retval[0]] = 0;
136free2:
137	(void)pipeclose(wpipe);
138free1:
139	(void)pipeclose(rpipe);
140	return (error);
141}
142
143/*
144 * initialize and allocate VM and memory for pipe
145 */
146static void
147pipeinit(cpipe)
148	struct pipe *cpipe;
149{
150	int npages, error;
151
152	npages = round_page(PIPESIZE)/PAGE_SIZE;
153
154	/*
155	 * Create an object, I don't like the idea of paging to/from
156	 * kernel_object.
157	 */
158	cpipe->pipe_buffer.object = vm_object_allocate(OBJT_DEFAULT, npages);
159	cpipe->pipe_buffer.buffer = (caddr_t) vm_map_min(kernel_map);
160
161	/*
162	 * Insert the object into the kernel map, and allocate kva for it.
163	 * The map entry is, by default, pageable.
164	 */
165	error = vm_map_find(kernel_map, cpipe->pipe_buffer.object, 0,
166		(vm_offset_t *) &cpipe->pipe_buffer.buffer, PIPESIZE, 1,
167		VM_PROT_ALL, VM_PROT_ALL, 0);
168
169	if (error != KERN_SUCCESS)
170		panic("pipeinit: cannot allocate pipe -- out of kvm -- code = %d", error);
171
172	cpipe->pipe_buffer.in = 0;
173	cpipe->pipe_buffer.out = 0;
174	cpipe->pipe_buffer.cnt = 0;
175	cpipe->pipe_buffer.size = PIPESIZE;
176
177	cpipe->pipe_state = 0;
178	cpipe->pipe_peer = NULL;
179	cpipe->pipe_busy = 0;
180	cpipe->pipe_ctime = time;
181	cpipe->pipe_atime = time;
182	cpipe->pipe_mtime = time;
183	bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel);
184}
185
186
187/*
188 * lock a pipe for I/O, blocking other access
189 */
190static __inline int
191pipelock(cpipe)
192	struct pipe *cpipe;
193{
194	while (cpipe->pipe_state & PIPE_LOCK) {
195		cpipe->pipe_state |= PIPE_LWANT;
196		if (tsleep( &cpipe->pipe_state, PRIBIO|PCATCH, "pipelk", 0)) {
197			return ERESTART;
198		}
199	}
200	cpipe->pipe_state |= PIPE_LOCK;
201	return 0;
202}
203
204/*
205 * unlock a pipe I/O lock
206 */
207static __inline void
208pipeunlock(cpipe)
209	struct pipe *cpipe;
210{
211	cpipe->pipe_state &= ~PIPE_LOCK;
212	if (cpipe->pipe_state & PIPE_LWANT) {
213		cpipe->pipe_state &= ~PIPE_LWANT;
214		wakeup(&cpipe->pipe_state);
215	}
216	return;
217}
218
219/* ARGSUSED */
220static int
221pipe_read(fp, uio, cred)
222	struct file *fp;
223	struct uio *uio;
224	struct ucred *cred;
225{
226
227	struct pipe *rpipe = (struct pipe *) fp->f_data;
228	int error = 0;
229	int nread = 0;
230
231	++rpipe->pipe_busy;
232	while (uio->uio_resid) {
233		if (rpipe->pipe_buffer.cnt > 0) {
234			int size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
235			if (size > rpipe->pipe_buffer.cnt)
236				size = rpipe->pipe_buffer.cnt;
237			if (size > uio->uio_resid)
238				size = uio->uio_resid;
239			if ((error = pipelock(rpipe)) == 0) {
240				error = uiomove( &rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
241					size, uio);
242				pipeunlock(rpipe);
243			}
244			if (error) {
245				break;
246			}
247			rpipe->pipe_buffer.out += size;
248			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
249				rpipe->pipe_buffer.out = 0;
250
251			rpipe->pipe_buffer.cnt -= size;
252			nread += size;
253			rpipe->pipe_atime = time;
254		} else {
255			/*
256			 * detect EOF condition
257			 */
258			if (rpipe->pipe_state & PIPE_EOF) {
259				break;
260			}
261			/*
262			 * If the "write-side" has been blocked, wake it up now.
263			 */
264			if (rpipe->pipe_state & PIPE_WANTW) {
265				rpipe->pipe_state &= ~PIPE_WANTW;
266				wakeup(rpipe);
267			}
268			if (nread > 0)
269				break;
270			if (rpipe->pipe_state & PIPE_NBIO) {
271				error = EAGAIN;
272				break;
273			}
274			if (rpipe->pipe_peer == NULL)
275				break;
276
277			/*
278			 * If there is no more to read in the pipe, reset
279			 * it's pointers to the beginning.  This improves
280			 * cache hit stats.
281			 */
282
283			if ((error = pipelock(rpipe)) == 0) {
284				if (rpipe->pipe_buffer.cnt == 0) {
285					rpipe->pipe_buffer.in = 0;
286					rpipe->pipe_buffer.out = 0;
287				}
288				pipeunlock(rpipe);
289			} else {
290				break;
291			}
292			rpipe->pipe_state |= PIPE_WANTR;
293			if (tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) {
294				error = ERESTART;
295				break;
296			}
297		}
298	}
299
300	--rpipe->pipe_busy;
301	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
302		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
303		wakeup(rpipe);
304	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
305		/*
306		 * If there is no more to read in the pipe, reset
307		 * it's pointers to the beginning.  This improves
308		 * cache hit stats.
309		 */
310		if ((error == 0) && (error = pipelock(rpipe)) == 0) {
311			if (rpipe->pipe_buffer.cnt == 0) {
312				rpipe->pipe_buffer.in = 0;
313				rpipe->pipe_buffer.out = 0;
314			}
315			pipeunlock(rpipe);
316		}
317
318		/*
319		 * If the "write-side" has been blocked, wake it up now.
320		 */
321		if (rpipe->pipe_state & PIPE_WANTW) {
322			rpipe->pipe_state &= ~PIPE_WANTW;
323			wakeup(rpipe);
324		}
325	}
326	if (rpipe->pipe_state & PIPE_SEL) {
327		rpipe->pipe_state &= ~PIPE_SEL;
328		selwakeup(&rpipe->pipe_sel);
329	}
330	return error;
331}
332
333/* ARGSUSED */
334static int
335pipe_write(fp, uio, cred)
336	struct file *fp;
337	struct uio *uio;
338	struct ucred *cred;
339{
340	struct pipe *rpipe = (struct pipe *) fp->f_data;
341	struct pipe *wpipe = rpipe->pipe_peer;
342	int error = 0;
343
344	/*
345	 * detect loss of pipe read side, issue SIGPIPE if lost.
346	 */
347	if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF)) {
348		psignal(curproc, SIGPIPE);
349		return EPIPE;
350	}
351
352	++wpipe->pipe_busy;
353	while (uio->uio_resid) {
354		int space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
355		if (space > 0) {
356			int size = wpipe->pipe_buffer.size - wpipe->pipe_buffer.in;
357			if (size > space)
358				size = space;
359			if (size > uio->uio_resid)
360				size = uio->uio_resid;
361			if ((error = pipelock(wpipe)) == 0) {
362				error = uiomove( &wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
363					size, uio);
364				pipeunlock(wpipe);
365			}
366			if (error)
367				break;
368
369			wpipe->pipe_buffer.in += size;
370			if (wpipe->pipe_buffer.in >= wpipe->pipe_buffer.size)
371				wpipe->pipe_buffer.in = 0;
372
373			wpipe->pipe_buffer.cnt += size;
374			wpipe->pipe_mtime = time;
375		} else {
376			/*
377			 * If the "read-side" has been blocked, wake it up now.
378			 */
379			if (wpipe->pipe_state & PIPE_WANTR) {
380				wpipe->pipe_state &= ~PIPE_WANTR;
381				wakeup(wpipe);
382			}
383			/*
384			 * don't block on non-blocking I/O
385			 */
386			if (wpipe->pipe_state & PIPE_NBIO) {
387				break;
388			}
389			wpipe->pipe_state |= PIPE_WANTW;
390			if (tsleep(wpipe, (PRIBIO+1)|PCATCH, "pipewr", 0)) {
391				error = ERESTART;
392				break;
393			}
394			/*
395			 * If read side wants to go away, we just issue a signal
396			 * to ourselves.
397			 */
398			if (wpipe->pipe_state & PIPE_EOF) {
399				psignal(curproc, SIGPIPE);
400				error = EPIPE;
401			}
402		}
403	}
404
405	--wpipe->pipe_busy;
406	if ((wpipe->pipe_busy == 0) &&
407		(wpipe->pipe_state & PIPE_WANT)) {
408		wpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTR);
409		wakeup(wpipe);
410	} else if (wpipe->pipe_buffer.cnt > 0) {
411		/*
412		 * If we have put any characters in the buffer, we wake up
413		 * the reader.
414		 */
415		if (wpipe->pipe_state & PIPE_WANTR) {
416			wpipe->pipe_state &= ~PIPE_WANTR;
417			wakeup(wpipe);
418		}
419	}
420	if (wpipe->pipe_state & PIPE_SEL) {
421		wpipe->pipe_state &= ~PIPE_SEL;
422		selwakeup(&wpipe->pipe_sel);
423	}
424	return error;
425}
426
427/*
428 * we implement a very minimal set of ioctls for compatibility with sockets.
429 */
430int
431pipe_ioctl(fp, cmd, data, p)
432	struct file *fp;
433	int cmd;
434	register caddr_t data;
435	struct proc *p;
436{
437	register struct pipe *mpipe = (struct pipe *)fp->f_data;
438
439	switch (cmd) {
440
441	case FIONBIO:
442		if (*(int *)data)
443			mpipe->pipe_state |= PIPE_NBIO;
444		else
445			mpipe->pipe_state &= ~PIPE_NBIO;
446		return (0);
447
448	case FIOASYNC:
449		if (*(int *)data) {
450			mpipe->pipe_state |= PIPE_ASYNC;
451		} else {
452			mpipe->pipe_state &= ~PIPE_ASYNC;
453		}
454		return (0);
455
456	case FIONREAD:
457		*(int *)data = mpipe->pipe_buffer.cnt;
458		return (0);
459
460	case SIOCSPGRP:
461		mpipe->pipe_pgid = *(int *)data;
462		return (0);
463
464	case SIOCGPGRP:
465		*(int *)data = mpipe->pipe_pgid;
466		return (0);
467
468	}
469	return ENOSYS;
470}
471
472int
473pipe_select(fp, which, p)
474	struct file *fp;
475	int which;
476	struct proc *p;
477{
478	register struct pipe *rpipe = (struct pipe *)fp->f_data;
479	struct pipe *wpipe;
480	register int s = splnet();
481
482	wpipe = rpipe->pipe_peer;
483	switch (which) {
484
485	case FREAD:
486		if (rpipe->pipe_buffer.cnt > 0) {
487			splx(s);
488			return (1);
489		}
490		selrecord(p, &rpipe->pipe_sel);
491		rpipe->pipe_state |= PIPE_SEL;
492		break;
493
494	case FWRITE:
495		if (wpipe == 0) {
496			splx(s);
497			return (1);
498		}
499		if (wpipe->pipe_buffer.cnt < wpipe->pipe_buffer.size) {
500			splx(s);
501			return (1);
502		}
503		selrecord(p, &wpipe->pipe_sel);
504		wpipe->pipe_state |= PIPE_SEL;
505		break;
506
507	case 0:
508		selrecord(p, &rpipe->pipe_sel);
509		rpipe->pipe_state |= PIPE_SEL;
510		break;
511	}
512	splx(s);
513	return (0);
514}
515
516int
517pipe_stat(pipe, ub)
518	register struct pipe *pipe;
519	register struct stat *ub;
520{
521	bzero((caddr_t)ub, sizeof (*ub));
522	ub->st_mode = S_IFSOCK;
523	ub->st_blksize = pipe->pipe_buffer.size / 2;
524	ub->st_size = pipe->pipe_buffer.cnt;
525	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
526	TIMEVAL_TO_TIMESPEC(&pipe->pipe_atime, &ub->st_atimespec);
527	TIMEVAL_TO_TIMESPEC(&pipe->pipe_mtime, &ub->st_mtimespec);
528	TIMEVAL_TO_TIMESPEC(&pipe->pipe_ctime, &ub->st_ctimespec);
529	return 0;
530}
531
532/* ARGSUSED */
533static int
534pipe_close(fp, p)
535	struct file *fp;
536	struct proc *p;
537{
538	int error = 0;
539	struct pipe *cpipe = (struct pipe *)fp->f_data;
540	pipeclose(cpipe);
541	fp->f_data = NULL;
542	return 0;
543}
544
545/*
546 * shutdown the pipe
547 */
548static void
549pipeclose(cpipe)
550	struct pipe *cpipe;
551{
552	if (cpipe) {
553		/*
554		 * If the other side is blocked, wake it up saying that
555		 * we want to close it down.
556		 */
557		while (cpipe->pipe_busy) {
558			wakeup(cpipe);
559			cpipe->pipe_state |= PIPE_WANT|PIPE_EOF;
560			tsleep(cpipe, PRIBIO, "pipecl", 0);
561		}
562
563		/*
564		 * Disconnect from peer
565		 */
566		if (cpipe->pipe_peer) {
567			cpipe->pipe_peer->pipe_state |= PIPE_EOF;
568			wakeup(cpipe->pipe_peer);
569			cpipe->pipe_peer->pipe_peer = NULL;
570		}
571
572		/*
573		 * free resources
574		 */
575		kmem_free(kernel_map, (vm_offset_t)cpipe->pipe_buffer.buffer,
576			cpipe->pipe_buffer.size);
577		free(cpipe, M_TEMP);
578	}
579}
580#endif
581