Deleted Added
full compact
sys_pipe.c (92305) sys_pipe.c (92654)
1/*
2 * Copyright (c) 1996 John S. Dyson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice immediately at the beginning of the file, without modification,
10 * this list of conditions, and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * 3. Absolutely no warranty of function or purpose is made by the author
15 * John S. Dyson.
16 * 4. Modifications may be freely made to this file if the above conditions
17 * are met.
18 *
1/*
2 * Copyright (c) 1996 John S. Dyson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice immediately at the beginning of the file, without modification,
10 * this list of conditions, and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * 3. Absolutely no warranty of function or purpose is made by the author
15 * John S. Dyson.
16 * 4. Modifications may be freely made to this file if the above conditions
17 * are met.
18 *
19 * $FreeBSD: head/sys/kern/sys_pipe.c 92305 2002-03-15 07:18:09Z alfred $
19 * $FreeBSD: head/sys/kern/sys_pipe.c 92654 2002-03-19 09:11:49Z jeff $
20 */
21
22/*
23 * This file contains a high-performance replacement for the socket-based
24 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support
25 * all features of sockets, but does do everything that pipes normally
26 * do.
27 */
28
29/*
30 * This code has two modes of operation, a small write mode and a large
31 * write mode. The small write mode acts like conventional pipes with
32 * a kernel buffer. If the buffer is less than PIPE_MINDIRECT, then the
33 * "normal" pipe buffering is done. If the buffer is between PIPE_MINDIRECT
34 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
35 * the receiving process can copy it directly from the pages in the sending
36 * process.
37 *
38 * If the sending process receives a signal, it is possible that it will
39 * go away, and certainly its address space can change, because control
40 * is returned back to the user-mode side. In that case, the pipe code
41 * arranges to copy the buffer supplied by the user process, to a pageable
42 * kernel buffer, and the receiving process will grab the data from the
43 * pageable kernel buffer. Since signals don't happen all that often,
44 * the copy operation is normally eliminated.
45 *
46 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
47 * happen for small transfers so that the system will not spend all of
48 * its time context switching. PIPE_SIZE is constrained by the
49 * amount of kernel virtual memory.
50 */
51
52#include <sys/param.h>
53#include <sys/systm.h>
54#include <sys/fcntl.h>
55#include <sys/file.h>
56#include <sys/filedesc.h>
57#include <sys/filio.h>
58#include <sys/kernel.h>
59#include <sys/lock.h>
60#include <sys/mutex.h>
61#include <sys/ttycom.h>
62#include <sys/stat.h>
63#include <sys/malloc.h>
64#include <sys/poll.h>
65#include <sys/selinfo.h>
66#include <sys/signalvar.h>
67#include <sys/sysproto.h>
68#include <sys/pipe.h>
69#include <sys/proc.h>
70#include <sys/vnode.h>
71#include <sys/uio.h>
72#include <sys/event.h>
73
74#include <vm/vm.h>
75#include <vm/vm_param.h>
76#include <vm/vm_object.h>
77#include <vm/vm_kern.h>
78#include <vm/vm_extern.h>
79#include <vm/pmap.h>
80#include <vm/vm_map.h>
81#include <vm/vm_page.h>
82#include <vm/vm_zone.h>
83
84/*
85 * Use this define if you want to disable *fancy* VM things. Expect an
86 * approx 30% decrease in transfer rate. This could be useful for
87 * NetBSD or OpenBSD.
88 */
89/* #define PIPE_NODIRECT */
90
91/*
92 * interfaces to the outside world
93 */
94static int pipe_read(struct file *fp, struct uio *uio,
95 struct ucred *cred, int flags, struct thread *td);
96static int pipe_write(struct file *fp, struct uio *uio,
97 struct ucred *cred, int flags, struct thread *td);
98static int pipe_close(struct file *fp, struct thread *td);
99static int pipe_poll(struct file *fp, int events, struct ucred *cred,
100 struct thread *td);
101static int pipe_kqfilter(struct file *fp, struct knote *kn);
102static int pipe_stat(struct file *fp, struct stat *sb, struct thread *td);
103static int pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct thread *td);
104
105static struct fileops pipeops = {
106 pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter,
107 pipe_stat, pipe_close
108};
109
110static void filt_pipedetach(struct knote *kn);
111static int filt_piperead(struct knote *kn, long hint);
112static int filt_pipewrite(struct knote *kn, long hint);
113
114static struct filterops pipe_rfiltops =
115 { 1, NULL, filt_pipedetach, filt_piperead };
116static struct filterops pipe_wfiltops =
117 { 1, NULL, filt_pipedetach, filt_pipewrite };
118
119#define PIPE_GET_GIANT(pipe) \
120 do { \
121 KASSERT(((pipe)->pipe_state & PIPE_LOCKFL) != 0, \
122 ("%s:%d PIPE_GET_GIANT: line pipe not locked", \
123 __FILE__, __LINE__)); \
124 PIPE_UNLOCK(pipe); \
125 mtx_lock(&Giant); \
126 } while (0)
127
128#define PIPE_DROP_GIANT(pipe) \
129 do { \
130 mtx_unlock(&Giant); \
131 PIPE_LOCK(pipe); \
132 } while (0)
133
134/*
135 * Default pipe buffer size(s), this can be kind-of large now because pipe
136 * space is pageable. The pipe code will try to maintain locality of
137 * reference for performance reasons, so small amounts of outstanding I/O
138 * will not wipe the cache.
139 */
140#define MINPIPESIZE (PIPE_SIZE/3)
141#define MAXPIPESIZE (2*PIPE_SIZE/3)
142
143/*
144 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
145 * is there so that on large systems, we don't exhaust it.
146 */
147#define MAXPIPEKVA (8*1024*1024)
148
149/*
150 * Limit for direct transfers, we cannot, of course limit
151 * the amount of kva for pipes in general though.
152 */
153#define LIMITPIPEKVA (16*1024*1024)
154
155/*
156 * Limit the number of "big" pipes
157 */
158#define LIMITBIGPIPES 32
159static int nbigpipe;
160
161static int amountpipekva;
162
163static void pipeinit(void *dummy __unused);
164static void pipeclose(struct pipe *cpipe);
165static void pipe_free_kmem(struct pipe *cpipe);
166static int pipe_create(struct pipe **cpipep);
167static __inline int pipelock(struct pipe *cpipe, int catch);
168static __inline void pipeunlock(struct pipe *cpipe);
169static __inline void pipeselwakeup(struct pipe *cpipe);
170#ifndef PIPE_NODIRECT
171static int pipe_build_write_buffer(struct pipe *wpipe, struct uio *uio);
172static void pipe_destroy_write_buffer(struct pipe *wpipe);
173static int pipe_direct_write(struct pipe *wpipe, struct uio *uio);
174static void pipe_clone_write_buffer(struct pipe *wpipe);
175#endif
176static int pipespace(struct pipe *cpipe, int size);
177
178static vm_zone_t pipe_zone;
179
180SYSINIT(vfs, SI_SUB_VFS, SI_ORDER_ANY, pipeinit, NULL);
181
182static void
183pipeinit(void *dummy __unused)
184{
20 */
21
22/*
23 * This file contains a high-performance replacement for the socket-based
24 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support
25 * all features of sockets, but does do everything that pipes normally
26 * do.
27 */
28
29/*
30 * This code has two modes of operation, a small write mode and a large
31 * write mode. The small write mode acts like conventional pipes with
32 * a kernel buffer. If the buffer is less than PIPE_MINDIRECT, then the
33 * "normal" pipe buffering is done. If the buffer is between PIPE_MINDIRECT
34 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
35 * the receiving process can copy it directly from the pages in the sending
36 * process.
37 *
38 * If the sending process receives a signal, it is possible that it will
39 * go away, and certainly its address space can change, because control
40 * is returned back to the user-mode side. In that case, the pipe code
41 * arranges to copy the buffer supplied by the user process, to a pageable
42 * kernel buffer, and the receiving process will grab the data from the
43 * pageable kernel buffer. Since signals don't happen all that often,
44 * the copy operation is normally eliminated.
45 *
46 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
47 * happen for small transfers so that the system will not spend all of
48 * its time context switching. PIPE_SIZE is constrained by the
49 * amount of kernel virtual memory.
50 */
51
52#include <sys/param.h>
53#include <sys/systm.h>
54#include <sys/fcntl.h>
55#include <sys/file.h>
56#include <sys/filedesc.h>
57#include <sys/filio.h>
58#include <sys/kernel.h>
59#include <sys/lock.h>
60#include <sys/mutex.h>
61#include <sys/ttycom.h>
62#include <sys/stat.h>
63#include <sys/malloc.h>
64#include <sys/poll.h>
65#include <sys/selinfo.h>
66#include <sys/signalvar.h>
67#include <sys/sysproto.h>
68#include <sys/pipe.h>
69#include <sys/proc.h>
70#include <sys/vnode.h>
71#include <sys/uio.h>
72#include <sys/event.h>
73
74#include <vm/vm.h>
75#include <vm/vm_param.h>
76#include <vm/vm_object.h>
77#include <vm/vm_kern.h>
78#include <vm/vm_extern.h>
79#include <vm/pmap.h>
80#include <vm/vm_map.h>
81#include <vm/vm_page.h>
82#include <vm/vm_zone.h>
83
84/*
85 * Use this define if you want to disable *fancy* VM things. Expect an
86 * approx 30% decrease in transfer rate. This could be useful for
87 * NetBSD or OpenBSD.
88 */
89/* #define PIPE_NODIRECT */
90
91/*
92 * interfaces to the outside world
93 */
94static int pipe_read(struct file *fp, struct uio *uio,
95 struct ucred *cred, int flags, struct thread *td);
96static int pipe_write(struct file *fp, struct uio *uio,
97 struct ucred *cred, int flags, struct thread *td);
98static int pipe_close(struct file *fp, struct thread *td);
99static int pipe_poll(struct file *fp, int events, struct ucred *cred,
100 struct thread *td);
101static int pipe_kqfilter(struct file *fp, struct knote *kn);
102static int pipe_stat(struct file *fp, struct stat *sb, struct thread *td);
103static int pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct thread *td);
104
105static struct fileops pipeops = {
106 pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter,
107 pipe_stat, pipe_close
108};
109
110static void filt_pipedetach(struct knote *kn);
111static int filt_piperead(struct knote *kn, long hint);
112static int filt_pipewrite(struct knote *kn, long hint);
113
114static struct filterops pipe_rfiltops =
115 { 1, NULL, filt_pipedetach, filt_piperead };
116static struct filterops pipe_wfiltops =
117 { 1, NULL, filt_pipedetach, filt_pipewrite };
118
119#define PIPE_GET_GIANT(pipe) \
120 do { \
121 KASSERT(((pipe)->pipe_state & PIPE_LOCKFL) != 0, \
122 ("%s:%d PIPE_GET_GIANT: line pipe not locked", \
123 __FILE__, __LINE__)); \
124 PIPE_UNLOCK(pipe); \
125 mtx_lock(&Giant); \
126 } while (0)
127
128#define PIPE_DROP_GIANT(pipe) \
129 do { \
130 mtx_unlock(&Giant); \
131 PIPE_LOCK(pipe); \
132 } while (0)
133
134/*
135 * Default pipe buffer size(s), this can be kind-of large now because pipe
136 * space is pageable. The pipe code will try to maintain locality of
137 * reference for performance reasons, so small amounts of outstanding I/O
138 * will not wipe the cache.
139 */
140#define MINPIPESIZE (PIPE_SIZE/3)
141#define MAXPIPESIZE (2*PIPE_SIZE/3)
142
143/*
144 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
145 * is there so that on large systems, we don't exhaust it.
146 */
147#define MAXPIPEKVA (8*1024*1024)
148
149/*
150 * Limit for direct transfers, we cannot, of course limit
151 * the amount of kva for pipes in general though.
152 */
153#define LIMITPIPEKVA (16*1024*1024)
154
155/*
156 * Limit the number of "big" pipes
157 */
158#define LIMITBIGPIPES 32
159static int nbigpipe;
160
161static int amountpipekva;
162
163static void pipeinit(void *dummy __unused);
164static void pipeclose(struct pipe *cpipe);
165static void pipe_free_kmem(struct pipe *cpipe);
166static int pipe_create(struct pipe **cpipep);
167static __inline int pipelock(struct pipe *cpipe, int catch);
168static __inline void pipeunlock(struct pipe *cpipe);
169static __inline void pipeselwakeup(struct pipe *cpipe);
170#ifndef PIPE_NODIRECT
171static int pipe_build_write_buffer(struct pipe *wpipe, struct uio *uio);
172static void pipe_destroy_write_buffer(struct pipe *wpipe);
173static int pipe_direct_write(struct pipe *wpipe, struct uio *uio);
174static void pipe_clone_write_buffer(struct pipe *wpipe);
175#endif
176static int pipespace(struct pipe *cpipe, int size);
177
178static vm_zone_t pipe_zone;
179
180SYSINIT(vfs, SI_SUB_VFS, SI_ORDER_ANY, pipeinit, NULL);
181
182static void
183pipeinit(void *dummy __unused)
184{
185
186 pipe_zone = zinit("PIPE", sizeof(struct pipe), 0, 0, 4);
185 pipe_zone = uma_zcreate("PIPE", sizeof(struct pipe), NULL,
186 NULL, NULL, NULL, UMA_ALIGN_PTR, 0);
187}
188
189/*
190 * The pipe system call for the DTYPE_PIPE type of pipes
191 */
192
193/* ARGSUSED */
194int
195pipe(td, uap)
196 struct thread *td;
197 struct pipe_args /* {
198 int dummy;
199 } */ *uap;
200{
201 struct filedesc *fdp = td->td_proc->p_fd;
202 struct file *rf, *wf;
203 struct pipe *rpipe, *wpipe;
204 struct mtx *pmtx;
205 int fd, error;
206
207 KASSERT(pipe_zone != NULL, ("pipe_zone not initialized"));
208
209 pmtx = malloc(sizeof(*pmtx), M_TEMP, M_WAITOK | M_ZERO);
210
211 rpipe = wpipe = NULL;
212 if (pipe_create(&rpipe) || pipe_create(&wpipe)) {
213 pipeclose(rpipe);
214 pipeclose(wpipe);
215 free(pmtx, M_TEMP);
216 return (ENFILE);
217 }
218
219 rpipe->pipe_state |= PIPE_DIRECTOK;
220 wpipe->pipe_state |= PIPE_DIRECTOK;
221
222 error = falloc(td, &rf, &fd);
223 if (error) {
224 pipeclose(rpipe);
225 pipeclose(wpipe);
226 free(pmtx, M_TEMP);
227 return (error);
228 }
229 fhold(rf);
230 td->td_retval[0] = fd;
231
232 /*
233 * Warning: once we've gotten past allocation of the fd for the
234 * read-side, we can only drop the read side via fdrop() in order
235 * to avoid races against processes which manage to dup() the read
236 * side while we are blocked trying to allocate the write side.
237 */
238 FILE_LOCK(rf);
239 rf->f_flag = FREAD | FWRITE;
240 rf->f_type = DTYPE_PIPE;
241 rf->f_data = (caddr_t)rpipe;
242 rf->f_ops = &pipeops;
243 FILE_UNLOCK(rf);
244 error = falloc(td, &wf, &fd);
245 if (error) {
246 FILEDESC_LOCK(fdp);
247 if (fdp->fd_ofiles[td->td_retval[0]] == rf) {
248 fdp->fd_ofiles[td->td_retval[0]] = NULL;
249 FILEDESC_UNLOCK(fdp);
250 fdrop(rf, td);
251 } else
252 FILEDESC_UNLOCK(fdp);
253 fdrop(rf, td);
254 /* rpipe has been closed by fdrop(). */
255 pipeclose(wpipe);
256 free(pmtx, M_TEMP);
257 return (error);
258 }
259 FILE_LOCK(wf);
260 wf->f_flag = FREAD | FWRITE;
261 wf->f_type = DTYPE_PIPE;
262 wf->f_data = (caddr_t)wpipe;
263 wf->f_ops = &pipeops;
264 FILE_UNLOCK(wf);
265 td->td_retval[1] = fd;
266 rpipe->pipe_peer = wpipe;
267 wpipe->pipe_peer = rpipe;
268 mtx_init(pmtx, "pipe mutex", MTX_DEF);
269 rpipe->pipe_mtxp = wpipe->pipe_mtxp = pmtx;
270 fdrop(rf, td);
271
272 return (0);
273}
274
275/*
276 * Allocate kva for pipe circular buffer, the space is pageable
277 * This routine will 'realloc' the size of a pipe safely, if it fails
278 * it will retain the old buffer.
279 * If it fails it will return ENOMEM.
280 */
281static int
282pipespace(cpipe, size)
283 struct pipe *cpipe;
284 int size;
285{
286 struct vm_object *object;
287 caddr_t buffer;
288 int npages, error;
289
290 GIANT_REQUIRED;
291 KASSERT(cpipe->pipe_mtxp == NULL || !mtx_owned(PIPE_MTX(cpipe)),
292 ("pipespace: pipe mutex locked"));
293
294 npages = round_page(size)/PAGE_SIZE;
295 /*
296 * Create an object, I don't like the idea of paging to/from
297 * kernel_object.
298 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
299 */
300 object = vm_object_allocate(OBJT_DEFAULT, npages);
301 buffer = (caddr_t) vm_map_min(kernel_map);
302
303 /*
304 * Insert the object into the kernel map, and allocate kva for it.
305 * The map entry is, by default, pageable.
306 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
307 */
308 error = vm_map_find(kernel_map, object, 0,
309 (vm_offset_t *) &buffer, size, 1,
310 VM_PROT_ALL, VM_PROT_ALL, 0);
311
312 if (error != KERN_SUCCESS) {
313 vm_object_deallocate(object);
314 return (ENOMEM);
315 }
316
317 /* free old resources if we're resizing */
318 pipe_free_kmem(cpipe);
319 cpipe->pipe_buffer.object = object;
320 cpipe->pipe_buffer.buffer = buffer;
321 cpipe->pipe_buffer.size = size;
322 cpipe->pipe_buffer.in = 0;
323 cpipe->pipe_buffer.out = 0;
324 cpipe->pipe_buffer.cnt = 0;
325 amountpipekva += cpipe->pipe_buffer.size;
326 return (0);
327}
328
329/*
330 * initialize and allocate VM and memory for pipe
331 */
332static int
333pipe_create(cpipep)
334 struct pipe **cpipep;
335{
336 struct pipe *cpipe;
337 int error;
338
339 *cpipep = zalloc(pipe_zone);
340 if (*cpipep == NULL)
341 return (ENOMEM);
342
343 cpipe = *cpipep;
344
345 /* so pipespace()->pipe_free_kmem() doesn't follow junk pointer */
346 cpipe->pipe_buffer.object = NULL;
347#ifndef PIPE_NODIRECT
348 cpipe->pipe_map.kva = NULL;
349#endif
350 /*
351 * protect so pipeclose() doesn't follow a junk pointer
352 * if pipespace() fails.
353 */
354 bzero(&cpipe->pipe_sel, sizeof(cpipe->pipe_sel));
355 cpipe->pipe_state = 0;
356 cpipe->pipe_peer = NULL;
357 cpipe->pipe_busy = 0;
358
359#ifndef PIPE_NODIRECT
360 /*
361 * pipe data structure initializations to support direct pipe I/O
362 */
363 cpipe->pipe_map.cnt = 0;
364 cpipe->pipe_map.kva = 0;
365 cpipe->pipe_map.pos = 0;
366 cpipe->pipe_map.npages = 0;
367 /* cpipe->pipe_map.ms[] = invalid */
368#endif
369
370 cpipe->pipe_mtxp = NULL; /* avoid pipespace assertion */
371 error = pipespace(cpipe, PIPE_SIZE);
372 if (error)
373 return (error);
374
375 vfs_timestamp(&cpipe->pipe_ctime);
376 cpipe->pipe_atime = cpipe->pipe_ctime;
377 cpipe->pipe_mtime = cpipe->pipe_ctime;
378
379 return (0);
380}
381
382
383/*
384 * lock a pipe for I/O, blocking other access
385 */
386static __inline int
387pipelock(cpipe, catch)
388 struct pipe *cpipe;
389 int catch;
390{
391 int error;
392
393 PIPE_LOCK_ASSERT(cpipe, MA_OWNED);
394 while (cpipe->pipe_state & PIPE_LOCKFL) {
395 cpipe->pipe_state |= PIPE_LWANT;
396 error = msleep(cpipe, PIPE_MTX(cpipe),
397 catch ? (PRIBIO | PCATCH) : PRIBIO,
398 "pipelk", 0);
399 if (error != 0)
400 return (error);
401 }
402 cpipe->pipe_state |= PIPE_LOCKFL;
403 return (0);
404}
405
406/*
407 * unlock a pipe I/O lock
408 */
409static __inline void
410pipeunlock(cpipe)
411 struct pipe *cpipe;
412{
413
414 PIPE_LOCK_ASSERT(cpipe, MA_OWNED);
415 cpipe->pipe_state &= ~PIPE_LOCKFL;
416 if (cpipe->pipe_state & PIPE_LWANT) {
417 cpipe->pipe_state &= ~PIPE_LWANT;
418 wakeup(cpipe);
419 }
420}
421
422static __inline void
423pipeselwakeup(cpipe)
424 struct pipe *cpipe;
425{
426
427 if (cpipe->pipe_state & PIPE_SEL) {
428 cpipe->pipe_state &= ~PIPE_SEL;
429 selwakeup(&cpipe->pipe_sel);
430 }
431 if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio)
432 pgsigio(cpipe->pipe_sigio, SIGIO, 0);
433 KNOTE(&cpipe->pipe_sel.si_note, 0);
434}
435
436/* ARGSUSED */
437static int
438pipe_read(fp, uio, cred, flags, td)
439 struct file *fp;
440 struct uio *uio;
441 struct ucred *cred;
442 struct thread *td;
443 int flags;
444{
445 struct pipe *rpipe = (struct pipe *) fp->f_data;
446 int error;
447 int nread = 0;
448 u_int size;
449
450 PIPE_LOCK(rpipe);
451 ++rpipe->pipe_busy;
452 error = pipelock(rpipe, 1);
453 if (error)
454 goto unlocked_error;
455
456 while (uio->uio_resid) {
457 /*
458 * normal pipe buffer receive
459 */
460 if (rpipe->pipe_buffer.cnt > 0) {
461 size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
462 if (size > rpipe->pipe_buffer.cnt)
463 size = rpipe->pipe_buffer.cnt;
464 if (size > (u_int) uio->uio_resid)
465 size = (u_int) uio->uio_resid;
466
467 PIPE_UNLOCK(rpipe);
468 error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
469 size, uio);
470 PIPE_LOCK(rpipe);
471 if (error)
472 break;
473
474 rpipe->pipe_buffer.out += size;
475 if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
476 rpipe->pipe_buffer.out = 0;
477
478 rpipe->pipe_buffer.cnt -= size;
479
480 /*
481 * If there is no more to read in the pipe, reset
482 * its pointers to the beginning. This improves
483 * cache hit stats.
484 */
485 if (rpipe->pipe_buffer.cnt == 0) {
486 rpipe->pipe_buffer.in = 0;
487 rpipe->pipe_buffer.out = 0;
488 }
489 nread += size;
490#ifndef PIPE_NODIRECT
491 /*
492 * Direct copy, bypassing a kernel buffer.
493 */
494 } else if ((size = rpipe->pipe_map.cnt) &&
495 (rpipe->pipe_state & PIPE_DIRECTW)) {
496 caddr_t va;
497 if (size > (u_int) uio->uio_resid)
498 size = (u_int) uio->uio_resid;
499
500 va = (caddr_t) rpipe->pipe_map.kva +
501 rpipe->pipe_map.pos;
502 PIPE_UNLOCK(rpipe);
503 error = uiomove(va, size, uio);
504 PIPE_LOCK(rpipe);
505 if (error)
506 break;
507 nread += size;
508 rpipe->pipe_map.pos += size;
509 rpipe->pipe_map.cnt -= size;
510 if (rpipe->pipe_map.cnt == 0) {
511 rpipe->pipe_state &= ~PIPE_DIRECTW;
512 wakeup(rpipe);
513 }
514#endif
515 } else {
516 /*
517 * detect EOF condition
518 * read returns 0 on EOF, no need to set error
519 */
520 if (rpipe->pipe_state & PIPE_EOF)
521 break;
522
523 /*
524 * If the "write-side" has been blocked, wake it up now.
525 */
526 if (rpipe->pipe_state & PIPE_WANTW) {
527 rpipe->pipe_state &= ~PIPE_WANTW;
528 wakeup(rpipe);
529 }
530
531 /*
532 * Break if some data was read.
533 */
534 if (nread > 0)
535 break;
536
537 /*
538 * Unlock the pipe buffer for our remaining processing. We
539 * will either break out with an error or we will sleep and
540 * relock to loop.
541 */
542 pipeunlock(rpipe);
543
544 /*
545 * Handle non-blocking mode operation or
546 * wait for more data.
547 */
548 if (fp->f_flag & FNONBLOCK) {
549 error = EAGAIN;
550 } else {
551 rpipe->pipe_state |= PIPE_WANTR;
552 if ((error = msleep(rpipe, PIPE_MTX(rpipe),
553 PRIBIO | PCATCH,
554 "piperd", 0)) == 0)
555 error = pipelock(rpipe, 1);
556 }
557 if (error)
558 goto unlocked_error;
559 }
560 }
561 pipeunlock(rpipe);
562
563 /* XXX: should probably do this before getting any locks. */
564 if (error == 0)
565 vfs_timestamp(&rpipe->pipe_atime);
566unlocked_error:
567 --rpipe->pipe_busy;
568
569 /*
570 * PIPE_WANT processing only makes sense if pipe_busy is 0.
571 */
572 if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
573 rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
574 wakeup(rpipe);
575 } else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
576 /*
577 * Handle write blocking hysteresis.
578 */
579 if (rpipe->pipe_state & PIPE_WANTW) {
580 rpipe->pipe_state &= ~PIPE_WANTW;
581 wakeup(rpipe);
582 }
583 }
584
585 if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
586 pipeselwakeup(rpipe);
587
588 PIPE_UNLOCK(rpipe);
589 return (error);
590}
591
592#ifndef PIPE_NODIRECT
593/*
594 * Map the sending processes' buffer into kernel space and wire it.
595 * This is similar to a physical write operation.
596 */
597static int
598pipe_build_write_buffer(wpipe, uio)
599 struct pipe *wpipe;
600 struct uio *uio;
601{
602 u_int size;
603 int i;
604 vm_offset_t addr, endaddr, paddr;
605
606 GIANT_REQUIRED;
607 PIPE_LOCK_ASSERT(wpipe, MA_NOTOWNED);
608
609 size = (u_int) uio->uio_iov->iov_len;
610 if (size > wpipe->pipe_buffer.size)
611 size = wpipe->pipe_buffer.size;
612
613 endaddr = round_page((vm_offset_t)uio->uio_iov->iov_base + size);
614 addr = trunc_page((vm_offset_t)uio->uio_iov->iov_base);
615 for (i = 0; addr < endaddr; addr += PAGE_SIZE, i++) {
616 vm_page_t m;
617
618 if (vm_fault_quick((caddr_t)addr, VM_PROT_READ) < 0 ||
619 (paddr = pmap_kextract(addr)) == 0) {
620 int j;
621
622 for (j = 0; j < i; j++)
623 vm_page_unwire(wpipe->pipe_map.ms[j], 1);
624 return (EFAULT);
625 }
626
627 m = PHYS_TO_VM_PAGE(paddr);
628 vm_page_wire(m);
629 wpipe->pipe_map.ms[i] = m;
630 }
631
632/*
633 * set up the control block
634 */
635 wpipe->pipe_map.npages = i;
636 wpipe->pipe_map.pos =
637 ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
638 wpipe->pipe_map.cnt = size;
639
640/*
641 * and map the buffer
642 */
643 if (wpipe->pipe_map.kva == 0) {
644 /*
645 * We need to allocate space for an extra page because the
646 * address range might (will) span pages at times.
647 */
648 wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
649 wpipe->pipe_buffer.size + PAGE_SIZE);
650 amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE;
651 }
652 pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
653 wpipe->pipe_map.npages);
654
655/*
656 * and update the uio data
657 */
658
659 uio->uio_iov->iov_len -= size;
660 uio->uio_iov->iov_base += size;
661 if (uio->uio_iov->iov_len == 0)
662 uio->uio_iov++;
663 uio->uio_resid -= size;
664 uio->uio_offset += size;
665 return (0);
666}
667
668/*
669 * unmap and unwire the process buffer
670 */
671static void
672pipe_destroy_write_buffer(wpipe)
673 struct pipe *wpipe;
674{
675 int i;
676
677 GIANT_REQUIRED;
678 PIPE_LOCK_ASSERT(wpipe, MA_NOTOWNED);
679
680 if (wpipe->pipe_map.kva) {
681 pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
682
683 if (amountpipekva > MAXPIPEKVA) {
684 vm_offset_t kva = wpipe->pipe_map.kva;
685 wpipe->pipe_map.kva = 0;
686 kmem_free(kernel_map, kva,
687 wpipe->pipe_buffer.size + PAGE_SIZE);
688 amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
689 }
690 }
691 for (i = 0; i < wpipe->pipe_map.npages; i++)
692 vm_page_unwire(wpipe->pipe_map.ms[i], 1);
693 wpipe->pipe_map.npages = 0;
694}
695
696/*
697 * In the case of a signal, the writing process might go away. This
698 * code copies the data into the circular buffer so that the source
699 * pages can be freed without loss of data.
700 */
701static void
702pipe_clone_write_buffer(wpipe)
703 struct pipe *wpipe;
704{
705 int size;
706 int pos;
707
708 PIPE_LOCK_ASSERT(wpipe, MA_OWNED);
709 size = wpipe->pipe_map.cnt;
710 pos = wpipe->pipe_map.pos;
711 bcopy((caddr_t) wpipe->pipe_map.kva + pos,
712 (caddr_t) wpipe->pipe_buffer.buffer, size);
713
714 wpipe->pipe_buffer.in = size;
715 wpipe->pipe_buffer.out = 0;
716 wpipe->pipe_buffer.cnt = size;
717 wpipe->pipe_state &= ~PIPE_DIRECTW;
718
719 PIPE_GET_GIANT(wpipe);
720 pipe_destroy_write_buffer(wpipe);
721 PIPE_DROP_GIANT(wpipe);
722}
723
724/*
725 * This implements the pipe buffer write mechanism. Note that only
726 * a direct write OR a normal pipe write can be pending at any given time.
727 * If there are any characters in the pipe buffer, the direct write will
728 * be deferred until the receiving process grabs all of the bytes from
729 * the pipe buffer. Then the direct mapping write is set-up.
730 */
731static int
732pipe_direct_write(wpipe, uio)
733 struct pipe *wpipe;
734 struct uio *uio;
735{
736 int error;
737
738retry:
739 PIPE_LOCK_ASSERT(wpipe, MA_OWNED);
740 while (wpipe->pipe_state & PIPE_DIRECTW) {
741 if (wpipe->pipe_state & PIPE_WANTR) {
742 wpipe->pipe_state &= ~PIPE_WANTR;
743 wakeup(wpipe);
744 }
745 wpipe->pipe_state |= PIPE_WANTW;
746 error = msleep(wpipe, PIPE_MTX(wpipe),
747 PRIBIO | PCATCH, "pipdww", 0);
748 if (error)
749 goto error1;
750 if (wpipe->pipe_state & PIPE_EOF) {
751 error = EPIPE;
752 goto error1;
753 }
754 }
755 wpipe->pipe_map.cnt = 0; /* transfer not ready yet */
756 if (wpipe->pipe_buffer.cnt > 0) {
757 if (wpipe->pipe_state & PIPE_WANTR) {
758 wpipe->pipe_state &= ~PIPE_WANTR;
759 wakeup(wpipe);
760 }
761
762 wpipe->pipe_state |= PIPE_WANTW;
763 error = msleep(wpipe, PIPE_MTX(wpipe),
764 PRIBIO | PCATCH, "pipdwc", 0);
765 if (error)
766 goto error1;
767 if (wpipe->pipe_state & PIPE_EOF) {
768 error = EPIPE;
769 goto error1;
770 }
771 goto retry;
772 }
773
774 wpipe->pipe_state |= PIPE_DIRECTW;
775
776 pipelock(wpipe, 0);
777 PIPE_GET_GIANT(wpipe);
778 error = pipe_build_write_buffer(wpipe, uio);
779 PIPE_DROP_GIANT(wpipe);
780 pipeunlock(wpipe);
781 if (error) {
782 wpipe->pipe_state &= ~PIPE_DIRECTW;
783 goto error1;
784 }
785
786 error = 0;
787 while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
788 if (wpipe->pipe_state & PIPE_EOF) {
789 pipelock(wpipe, 0);
790 PIPE_GET_GIANT(wpipe);
791 pipe_destroy_write_buffer(wpipe);
792 PIPE_DROP_GIANT(wpipe);
793 pipeunlock(wpipe);
794 pipeselwakeup(wpipe);
795 error = EPIPE;
796 goto error1;
797 }
798 if (wpipe->pipe_state & PIPE_WANTR) {
799 wpipe->pipe_state &= ~PIPE_WANTR;
800 wakeup(wpipe);
801 }
802 pipeselwakeup(wpipe);
803 error = msleep(wpipe, PIPE_MTX(wpipe), PRIBIO | PCATCH,
804 "pipdwt", 0);
805 }
806
807 pipelock(wpipe,0);
808 if (wpipe->pipe_state & PIPE_DIRECTW) {
809 /*
810 * this bit of trickery substitutes a kernel buffer for
811 * the process that might be going away.
812 */
813 pipe_clone_write_buffer(wpipe);
814 } else {
815 PIPE_GET_GIANT(wpipe);
816 pipe_destroy_write_buffer(wpipe);
817 PIPE_DROP_GIANT(wpipe);
818 }
819 pipeunlock(wpipe);
820 return (error);
821
822error1:
823 wakeup(wpipe);
824 return (error);
825}
826#endif
827
828static int
829pipe_write(fp, uio, cred, flags, td)
830 struct file *fp;
831 struct uio *uio;
832 struct ucred *cred;
833 struct thread *td;
834 int flags;
835{
836 int error = 0;
837 int orig_resid;
838 struct pipe *wpipe, *rpipe;
839
840 rpipe = (struct pipe *) fp->f_data;
841 wpipe = rpipe->pipe_peer;
842
843 PIPE_LOCK(rpipe);
844 /*
845 * detect loss of pipe read side, issue SIGPIPE if lost.
846 */
847 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
848 PIPE_UNLOCK(rpipe);
849 return (EPIPE);
850 }
851 ++wpipe->pipe_busy;
852
853 /*
854 * If it is advantageous to resize the pipe buffer, do
855 * so.
856 */
857 if ((uio->uio_resid > PIPE_SIZE) &&
858 (nbigpipe < LIMITBIGPIPES) &&
859 (wpipe->pipe_state & PIPE_DIRECTW) == 0 &&
860 (wpipe->pipe_buffer.size <= PIPE_SIZE) &&
861 (wpipe->pipe_buffer.cnt == 0)) {
862
863 if ((error = pipelock(wpipe,1)) == 0) {
864 PIPE_GET_GIANT(wpipe);
865 if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
866 nbigpipe++;
867 PIPE_DROP_GIANT(wpipe);
868 pipeunlock(wpipe);
869 }
870 }
871
872 /*
873 * If an early error occured unbusy and return, waking up any pending
874 * readers.
875 */
876 if (error) {
877 --wpipe->pipe_busy;
878 if ((wpipe->pipe_busy == 0) &&
879 (wpipe->pipe_state & PIPE_WANT)) {
880 wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
881 wakeup(wpipe);
882 }
883 PIPE_UNLOCK(rpipe);
884 return(error);
885 }
886
887 KASSERT(wpipe->pipe_buffer.buffer != NULL, ("pipe buffer gone"));
888
889 orig_resid = uio->uio_resid;
890
891 while (uio->uio_resid) {
892 int space;
893
894#ifndef PIPE_NODIRECT
895 /*
896 * If the transfer is large, we can gain performance if
897 * we do process-to-process copies directly.
898 * If the write is non-blocking, we don't use the
899 * direct write mechanism.
900 *
901 * The direct write mechanism will detect the reader going
902 * away on us.
903 */
904 if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) &&
905 (fp->f_flag & FNONBLOCK) == 0 &&
906 (wpipe->pipe_map.kva || (amountpipekva < LIMITPIPEKVA)) &&
907 (uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
908 error = pipe_direct_write( wpipe, uio);
909 if (error)
910 break;
911 continue;
912 }
913#endif
914
915 /*
916 * Pipe buffered writes cannot be coincidental with
917 * direct writes. We wait until the currently executing
918 * direct write is completed before we start filling the
919 * pipe buffer. We break out if a signal occurs or the
920 * reader goes away.
921 */
922 retrywrite:
923 while (wpipe->pipe_state & PIPE_DIRECTW) {
924 if (wpipe->pipe_state & PIPE_WANTR) {
925 wpipe->pipe_state &= ~PIPE_WANTR;
926 wakeup(wpipe);
927 }
928 error = msleep(wpipe, PIPE_MTX(rpipe), PRIBIO | PCATCH,
929 "pipbww", 0);
930 if (wpipe->pipe_state & PIPE_EOF)
931 break;
932 if (error)
933 break;
934 }
935 if (wpipe->pipe_state & PIPE_EOF) {
936 error = EPIPE;
937 break;
938 }
939
940 space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
941
942 /* Writes of size <= PIPE_BUF must be atomic. */
943 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
944 space = 0;
945
946 if (space > 0 && (wpipe->pipe_buffer.cnt < PIPE_SIZE)) {
947 if ((error = pipelock(wpipe,1)) == 0) {
948 int size; /* Transfer size */
949 int segsize; /* first segment to transfer */
950
951 /*
952 * It is possible for a direct write to
953 * slip in on us... handle it here...
954 */
955 if (wpipe->pipe_state & PIPE_DIRECTW) {
956 pipeunlock(wpipe);
957 goto retrywrite;
958 }
959 /*
960 * If a process blocked in uiomove, our
961 * value for space might be bad.
962 *
963 * XXX will we be ok if the reader has gone
964 * away here?
965 */
966 if (space > wpipe->pipe_buffer.size -
967 wpipe->pipe_buffer.cnt) {
968 pipeunlock(wpipe);
969 goto retrywrite;
970 }
971
972 /*
973 * Transfer size is minimum of uio transfer
974 * and free space in pipe buffer.
975 */
976 if (space > uio->uio_resid)
977 size = uio->uio_resid;
978 else
979 size = space;
980 /*
981 * First segment to transfer is minimum of
982 * transfer size and contiguous space in
983 * pipe buffer. If first segment to transfer
984 * is less than the transfer size, we've got
985 * a wraparound in the buffer.
986 */
987 segsize = wpipe->pipe_buffer.size -
988 wpipe->pipe_buffer.in;
989 if (segsize > size)
990 segsize = size;
991
992 /* Transfer first segment */
993
994 PIPE_UNLOCK(rpipe);
995 error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
996 segsize, uio);
997 PIPE_LOCK(rpipe);
998
999 if (error == 0 && segsize < size) {
1000 /*
1001 * Transfer remaining part now, to
1002 * support atomic writes. Wraparound
1003 * happened.
1004 */
1005 if (wpipe->pipe_buffer.in + segsize !=
1006 wpipe->pipe_buffer.size)
1007 panic("Expected pipe buffer wraparound disappeared");
1008
1009 PIPE_UNLOCK(rpipe);
1010 error = uiomove(&wpipe->pipe_buffer.buffer[0],
1011 size - segsize, uio);
1012 PIPE_LOCK(rpipe);
1013 }
1014 if (error == 0) {
1015 wpipe->pipe_buffer.in += size;
1016 if (wpipe->pipe_buffer.in >=
1017 wpipe->pipe_buffer.size) {
1018 if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size)
1019 panic("Expected wraparound bad");
1020 wpipe->pipe_buffer.in = size - segsize;
1021 }
1022
1023 wpipe->pipe_buffer.cnt += size;
1024 if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size)
1025 panic("Pipe buffer overflow");
1026
1027 }
1028 pipeunlock(wpipe);
1029 }
1030 if (error)
1031 break;
1032
1033 } else {
1034 /*
1035 * If the "read-side" has been blocked, wake it up now.
1036 */
1037 if (wpipe->pipe_state & PIPE_WANTR) {
1038 wpipe->pipe_state &= ~PIPE_WANTR;
1039 wakeup(wpipe);
1040 }
1041
1042 /*
1043 * don't block on non-blocking I/O
1044 */
1045 if (fp->f_flag & FNONBLOCK) {
1046 error = EAGAIN;
1047 break;
1048 }
1049
1050 /*
1051 * We have no more space and have something to offer,
1052 * wake up select/poll.
1053 */
1054 pipeselwakeup(wpipe);
1055
1056 wpipe->pipe_state |= PIPE_WANTW;
1057 error = msleep(wpipe, PIPE_MTX(rpipe),
1058 PRIBIO | PCATCH, "pipewr", 0);
1059 if (error != 0)
1060 break;
1061 /*
1062 * If read side wants to go away, we just issue a signal
1063 * to ourselves.
1064 */
1065 if (wpipe->pipe_state & PIPE_EOF) {
1066 error = EPIPE;
1067 break;
1068 }
1069 }
1070 }
1071
1072 --wpipe->pipe_busy;
1073
1074 if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) {
1075 wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
1076 wakeup(wpipe);
1077 } else if (wpipe->pipe_buffer.cnt > 0) {
1078 /*
1079 * If we have put any characters in the buffer, we wake up
1080 * the reader.
1081 */
1082 if (wpipe->pipe_state & PIPE_WANTR) {
1083 wpipe->pipe_state &= ~PIPE_WANTR;
1084 wakeup(wpipe);
1085 }
1086 }
1087
1088 /*
1089 * Don't return EPIPE if I/O was successful
1090 */
1091 if ((wpipe->pipe_buffer.cnt == 0) &&
1092 (uio->uio_resid == 0) &&
1093 (error == EPIPE)) {
1094 error = 0;
1095 }
1096
1097 if (error == 0)
1098 vfs_timestamp(&wpipe->pipe_mtime);
1099
1100 /*
1101 * We have something to offer,
1102 * wake up select/poll.
1103 */
1104 if (wpipe->pipe_buffer.cnt)
1105 pipeselwakeup(wpipe);
1106
1107 PIPE_UNLOCK(rpipe);
1108 return (error);
1109}
1110
1111/*
1112 * we implement a very minimal set of ioctls for compatibility with sockets.
1113 */
1114int
1115pipe_ioctl(fp, cmd, data, td)
1116 struct file *fp;
1117 u_long cmd;
1118 caddr_t data;
1119 struct thread *td;
1120{
1121 struct pipe *mpipe = (struct pipe *)fp->f_data;
1122
1123 switch (cmd) {
1124
1125 case FIONBIO:
1126 return (0);
1127
1128 case FIOASYNC:
1129 PIPE_LOCK(mpipe);
1130 if (*(int *)data) {
1131 mpipe->pipe_state |= PIPE_ASYNC;
1132 } else {
1133 mpipe->pipe_state &= ~PIPE_ASYNC;
1134 }
1135 PIPE_UNLOCK(mpipe);
1136 return (0);
1137
1138 case FIONREAD:
1139 PIPE_LOCK(mpipe);
1140 if (mpipe->pipe_state & PIPE_DIRECTW)
1141 *(int *)data = mpipe->pipe_map.cnt;
1142 else
1143 *(int *)data = mpipe->pipe_buffer.cnt;
1144 PIPE_UNLOCK(mpipe);
1145 return (0);
1146
1147 case FIOSETOWN:
1148 return (fsetown(*(int *)data, &mpipe->pipe_sigio));
1149
1150 case FIOGETOWN:
1151 *(int *)data = fgetown(mpipe->pipe_sigio);
1152 return (0);
1153
1154 /* This is deprecated, FIOSETOWN should be used instead. */
1155 case TIOCSPGRP:
1156 return (fsetown(-(*(int *)data), &mpipe->pipe_sigio));
1157
1158 /* This is deprecated, FIOGETOWN should be used instead. */
1159 case TIOCGPGRP:
1160 *(int *)data = -fgetown(mpipe->pipe_sigio);
1161 return (0);
1162
1163 }
1164 return (ENOTTY);
1165}
1166
1167int
1168pipe_poll(fp, events, cred, td)
1169 struct file *fp;
1170 int events;
1171 struct ucred *cred;
1172 struct thread *td;
1173{
1174 struct pipe *rpipe = (struct pipe *)fp->f_data;
1175 struct pipe *wpipe;
1176 int revents = 0;
1177
1178 wpipe = rpipe->pipe_peer;
1179 PIPE_LOCK(rpipe);
1180 if (events & (POLLIN | POLLRDNORM))
1181 if ((rpipe->pipe_state & PIPE_DIRECTW) ||
1182 (rpipe->pipe_buffer.cnt > 0) ||
1183 (rpipe->pipe_state & PIPE_EOF))
1184 revents |= events & (POLLIN | POLLRDNORM);
1185
1186 if (events & (POLLOUT | POLLWRNORM))
1187 if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF) ||
1188 (((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
1189 (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF))
1190 revents |= events & (POLLOUT | POLLWRNORM);
1191
1192 if ((rpipe->pipe_state & PIPE_EOF) ||
1193 (wpipe == NULL) ||
1194 (wpipe->pipe_state & PIPE_EOF))
1195 revents |= POLLHUP;
1196
1197 if (revents == 0) {
1198 if (events & (POLLIN | POLLRDNORM)) {
1199 selrecord(td, &rpipe->pipe_sel);
1200 rpipe->pipe_state |= PIPE_SEL;
1201 }
1202
1203 if (events & (POLLOUT | POLLWRNORM)) {
1204 selrecord(td, &wpipe->pipe_sel);
1205 wpipe->pipe_state |= PIPE_SEL;
1206 }
1207 }
1208 PIPE_UNLOCK(rpipe);
1209
1210 return (revents);
1211}
1212
1213static int
1214pipe_stat(fp, ub, td)
1215 struct file *fp;
1216 struct stat *ub;
1217 struct thread *td;
1218{
1219 struct pipe *pipe = (struct pipe *)fp->f_data;
1220
1221 bzero((caddr_t)ub, sizeof(*ub));
1222 ub->st_mode = S_IFIFO;
1223 ub->st_blksize = pipe->pipe_buffer.size;
1224 ub->st_size = pipe->pipe_buffer.cnt;
1225 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1226 ub->st_atimespec = pipe->pipe_atime;
1227 ub->st_mtimespec = pipe->pipe_mtime;
1228 ub->st_ctimespec = pipe->pipe_ctime;
1229 ub->st_uid = fp->f_cred->cr_uid;
1230 ub->st_gid = fp->f_cred->cr_gid;
1231 /*
1232 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen.
1233 * XXX (st_dev, st_ino) should be unique.
1234 */
1235 return (0);
1236}
1237
1238/* ARGSUSED */
1239static int
1240pipe_close(fp, td)
1241 struct file *fp;
1242 struct thread *td;
1243{
1244 struct pipe *cpipe = (struct pipe *)fp->f_data;
1245
1246 fp->f_ops = &badfileops;
1247 fp->f_data = NULL;
1248 funsetown(cpipe->pipe_sigio);
1249 pipeclose(cpipe);
1250 return (0);
1251}
1252
1253static void
1254pipe_free_kmem(cpipe)
1255 struct pipe *cpipe;
1256{
1257
1258 GIANT_REQUIRED;
1259 KASSERT(cpipe->pipe_mtxp == NULL || !mtx_owned(PIPE_MTX(cpipe)),
1260 ("pipespace: pipe mutex locked"));
1261
1262 if (cpipe->pipe_buffer.buffer != NULL) {
1263 if (cpipe->pipe_buffer.size > PIPE_SIZE)
1264 --nbigpipe;
1265 amountpipekva -= cpipe->pipe_buffer.size;
1266 kmem_free(kernel_map,
1267 (vm_offset_t)cpipe->pipe_buffer.buffer,
1268 cpipe->pipe_buffer.size);
1269 cpipe->pipe_buffer.buffer = NULL;
1270 }
1271#ifndef PIPE_NODIRECT
1272 if (cpipe->pipe_map.kva != NULL) {
1273 amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE;
1274 kmem_free(kernel_map,
1275 cpipe->pipe_map.kva,
1276 cpipe->pipe_buffer.size + PAGE_SIZE);
1277 cpipe->pipe_map.cnt = 0;
1278 cpipe->pipe_map.kva = 0;
1279 cpipe->pipe_map.pos = 0;
1280 cpipe->pipe_map.npages = 0;
1281 }
1282#endif
1283}
1284
1285/*
1286 * shutdown the pipe
1287 */
1288static void
1289pipeclose(cpipe)
1290 struct pipe *cpipe;
1291{
1292 struct pipe *ppipe;
1293 int hadpeer;
1294
1295 if (cpipe == NULL)
1296 return;
1297
1298 hadpeer = 0;
1299
1300 /* partially created pipes won't have a valid mutex. */
1301 if (PIPE_MTX(cpipe) != NULL)
1302 PIPE_LOCK(cpipe);
1303
1304 pipeselwakeup(cpipe);
1305
1306 /*
1307 * If the other side is blocked, wake it up saying that
1308 * we want to close it down.
1309 */
1310 while (cpipe->pipe_busy) {
1311 wakeup(cpipe);
1312 cpipe->pipe_state |= PIPE_WANT | PIPE_EOF;
1313 msleep(cpipe, PIPE_MTX(cpipe), PRIBIO, "pipecl", 0);
1314 }
1315
1316 /*
1317 * Disconnect from peer
1318 */
1319 if ((ppipe = cpipe->pipe_peer) != NULL) {
1320 hadpeer++;
1321 pipeselwakeup(ppipe);
1322
1323 ppipe->pipe_state |= PIPE_EOF;
1324 wakeup(ppipe);
1325 KNOTE(&ppipe->pipe_sel.si_note, 0);
1326 ppipe->pipe_peer = NULL;
1327 }
1328 /*
1329 * free resources
1330 */
1331 if (PIPE_MTX(cpipe) != NULL) {
1332 PIPE_UNLOCK(cpipe);
1333 if (!hadpeer) {
1334 mtx_destroy(PIPE_MTX(cpipe));
1335 free(PIPE_MTX(cpipe), M_TEMP);
1336 }
1337 }
1338 mtx_lock(&Giant);
1339 pipe_free_kmem(cpipe);
1340 zfree(pipe_zone, cpipe);
1341 mtx_unlock(&Giant);
1342}
1343
1344/*ARGSUSED*/
1345static int
1346pipe_kqfilter(struct file *fp, struct knote *kn)
1347{
1348 struct pipe *cpipe;
1349
1350 cpipe = (struct pipe *)kn->kn_fp->f_data;
1351 switch (kn->kn_filter) {
1352 case EVFILT_READ:
1353 kn->kn_fop = &pipe_rfiltops;
1354 break;
1355 case EVFILT_WRITE:
1356 kn->kn_fop = &pipe_wfiltops;
1357 cpipe = cpipe->pipe_peer;
1358 break;
1359 default:
1360 return (1);
1361 }
1362 kn->kn_hook = (caddr_t)cpipe;
1363
1364 PIPE_LOCK(cpipe);
1365 SLIST_INSERT_HEAD(&cpipe->pipe_sel.si_note, kn, kn_selnext);
1366 PIPE_UNLOCK(cpipe);
1367 return (0);
1368}
1369
1370static void
1371filt_pipedetach(struct knote *kn)
1372{
1373 struct pipe *cpipe = (struct pipe *)kn->kn_hook;
1374
1375 PIPE_LOCK(cpipe);
1376 SLIST_REMOVE(&cpipe->pipe_sel.si_note, kn, knote, kn_selnext);
1377 PIPE_UNLOCK(cpipe);
1378}
1379
1380/*ARGSUSED*/
1381static int
1382filt_piperead(struct knote *kn, long hint)
1383{
1384 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1385 struct pipe *wpipe = rpipe->pipe_peer;
1386
1387 PIPE_LOCK(rpipe);
1388 kn->kn_data = rpipe->pipe_buffer.cnt;
1389 if ((kn->kn_data == 0) && (rpipe->pipe_state & PIPE_DIRECTW))
1390 kn->kn_data = rpipe->pipe_map.cnt;
1391
1392 if ((rpipe->pipe_state & PIPE_EOF) ||
1393 (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1394 kn->kn_flags |= EV_EOF;
1395 PIPE_UNLOCK(rpipe);
1396 return (1);
1397 }
1398 PIPE_UNLOCK(rpipe);
1399 return (kn->kn_data > 0);
1400}
1401
1402/*ARGSUSED*/
1403static int
1404filt_pipewrite(struct knote *kn, long hint)
1405{
1406 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1407 struct pipe *wpipe = rpipe->pipe_peer;
1408
1409 PIPE_LOCK(rpipe);
1410 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1411 kn->kn_data = 0;
1412 kn->kn_flags |= EV_EOF;
1413 PIPE_UNLOCK(rpipe);
1414 return (1);
1415 }
1416 kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
1417 if (wpipe->pipe_state & PIPE_DIRECTW)
1418 kn->kn_data = 0;
1419
1420 PIPE_UNLOCK(rpipe);
1421 return (kn->kn_data >= PIPE_BUF);
1422}
187}
188
189/*
190 * The pipe system call for the DTYPE_PIPE type of pipes
191 */
192
193/* ARGSUSED */
194int
195pipe(td, uap)
196 struct thread *td;
197 struct pipe_args /* {
198 int dummy;
199 } */ *uap;
200{
201 struct filedesc *fdp = td->td_proc->p_fd;
202 struct file *rf, *wf;
203 struct pipe *rpipe, *wpipe;
204 struct mtx *pmtx;
205 int fd, error;
206
207 KASSERT(pipe_zone != NULL, ("pipe_zone not initialized"));
208
209 pmtx = malloc(sizeof(*pmtx), M_TEMP, M_WAITOK | M_ZERO);
210
211 rpipe = wpipe = NULL;
212 if (pipe_create(&rpipe) || pipe_create(&wpipe)) {
213 pipeclose(rpipe);
214 pipeclose(wpipe);
215 free(pmtx, M_TEMP);
216 return (ENFILE);
217 }
218
219 rpipe->pipe_state |= PIPE_DIRECTOK;
220 wpipe->pipe_state |= PIPE_DIRECTOK;
221
222 error = falloc(td, &rf, &fd);
223 if (error) {
224 pipeclose(rpipe);
225 pipeclose(wpipe);
226 free(pmtx, M_TEMP);
227 return (error);
228 }
229 fhold(rf);
230 td->td_retval[0] = fd;
231
232 /*
233 * Warning: once we've gotten past allocation of the fd for the
234 * read-side, we can only drop the read side via fdrop() in order
235 * to avoid races against processes which manage to dup() the read
236 * side while we are blocked trying to allocate the write side.
237 */
238 FILE_LOCK(rf);
239 rf->f_flag = FREAD | FWRITE;
240 rf->f_type = DTYPE_PIPE;
241 rf->f_data = (caddr_t)rpipe;
242 rf->f_ops = &pipeops;
243 FILE_UNLOCK(rf);
244 error = falloc(td, &wf, &fd);
245 if (error) {
246 FILEDESC_LOCK(fdp);
247 if (fdp->fd_ofiles[td->td_retval[0]] == rf) {
248 fdp->fd_ofiles[td->td_retval[0]] = NULL;
249 FILEDESC_UNLOCK(fdp);
250 fdrop(rf, td);
251 } else
252 FILEDESC_UNLOCK(fdp);
253 fdrop(rf, td);
254 /* rpipe has been closed by fdrop(). */
255 pipeclose(wpipe);
256 free(pmtx, M_TEMP);
257 return (error);
258 }
259 FILE_LOCK(wf);
260 wf->f_flag = FREAD | FWRITE;
261 wf->f_type = DTYPE_PIPE;
262 wf->f_data = (caddr_t)wpipe;
263 wf->f_ops = &pipeops;
264 FILE_UNLOCK(wf);
265 td->td_retval[1] = fd;
266 rpipe->pipe_peer = wpipe;
267 wpipe->pipe_peer = rpipe;
268 mtx_init(pmtx, "pipe mutex", MTX_DEF);
269 rpipe->pipe_mtxp = wpipe->pipe_mtxp = pmtx;
270 fdrop(rf, td);
271
272 return (0);
273}
274
275/*
276 * Allocate kva for pipe circular buffer, the space is pageable
277 * This routine will 'realloc' the size of a pipe safely, if it fails
278 * it will retain the old buffer.
279 * If it fails it will return ENOMEM.
280 */
281static int
282pipespace(cpipe, size)
283 struct pipe *cpipe;
284 int size;
285{
286 struct vm_object *object;
287 caddr_t buffer;
288 int npages, error;
289
290 GIANT_REQUIRED;
291 KASSERT(cpipe->pipe_mtxp == NULL || !mtx_owned(PIPE_MTX(cpipe)),
292 ("pipespace: pipe mutex locked"));
293
294 npages = round_page(size)/PAGE_SIZE;
295 /*
296 * Create an object, I don't like the idea of paging to/from
297 * kernel_object.
298 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
299 */
300 object = vm_object_allocate(OBJT_DEFAULT, npages);
301 buffer = (caddr_t) vm_map_min(kernel_map);
302
303 /*
304 * Insert the object into the kernel map, and allocate kva for it.
305 * The map entry is, by default, pageable.
306 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
307 */
308 error = vm_map_find(kernel_map, object, 0,
309 (vm_offset_t *) &buffer, size, 1,
310 VM_PROT_ALL, VM_PROT_ALL, 0);
311
312 if (error != KERN_SUCCESS) {
313 vm_object_deallocate(object);
314 return (ENOMEM);
315 }
316
317 /* free old resources if we're resizing */
318 pipe_free_kmem(cpipe);
319 cpipe->pipe_buffer.object = object;
320 cpipe->pipe_buffer.buffer = buffer;
321 cpipe->pipe_buffer.size = size;
322 cpipe->pipe_buffer.in = 0;
323 cpipe->pipe_buffer.out = 0;
324 cpipe->pipe_buffer.cnt = 0;
325 amountpipekva += cpipe->pipe_buffer.size;
326 return (0);
327}
328
329/*
330 * initialize and allocate VM and memory for pipe
331 */
332static int
333pipe_create(cpipep)
334 struct pipe **cpipep;
335{
336 struct pipe *cpipe;
337 int error;
338
339 *cpipep = zalloc(pipe_zone);
340 if (*cpipep == NULL)
341 return (ENOMEM);
342
343 cpipe = *cpipep;
344
345 /* so pipespace()->pipe_free_kmem() doesn't follow junk pointer */
346 cpipe->pipe_buffer.object = NULL;
347#ifndef PIPE_NODIRECT
348 cpipe->pipe_map.kva = NULL;
349#endif
350 /*
351 * protect so pipeclose() doesn't follow a junk pointer
352 * if pipespace() fails.
353 */
354 bzero(&cpipe->pipe_sel, sizeof(cpipe->pipe_sel));
355 cpipe->pipe_state = 0;
356 cpipe->pipe_peer = NULL;
357 cpipe->pipe_busy = 0;
358
359#ifndef PIPE_NODIRECT
360 /*
361 * pipe data structure initializations to support direct pipe I/O
362 */
363 cpipe->pipe_map.cnt = 0;
364 cpipe->pipe_map.kva = 0;
365 cpipe->pipe_map.pos = 0;
366 cpipe->pipe_map.npages = 0;
367 /* cpipe->pipe_map.ms[] = invalid */
368#endif
369
370 cpipe->pipe_mtxp = NULL; /* avoid pipespace assertion */
371 error = pipespace(cpipe, PIPE_SIZE);
372 if (error)
373 return (error);
374
375 vfs_timestamp(&cpipe->pipe_ctime);
376 cpipe->pipe_atime = cpipe->pipe_ctime;
377 cpipe->pipe_mtime = cpipe->pipe_ctime;
378
379 return (0);
380}
381
382
383/*
384 * lock a pipe for I/O, blocking other access
385 */
386static __inline int
387pipelock(cpipe, catch)
388 struct pipe *cpipe;
389 int catch;
390{
391 int error;
392
393 PIPE_LOCK_ASSERT(cpipe, MA_OWNED);
394 while (cpipe->pipe_state & PIPE_LOCKFL) {
395 cpipe->pipe_state |= PIPE_LWANT;
396 error = msleep(cpipe, PIPE_MTX(cpipe),
397 catch ? (PRIBIO | PCATCH) : PRIBIO,
398 "pipelk", 0);
399 if (error != 0)
400 return (error);
401 }
402 cpipe->pipe_state |= PIPE_LOCKFL;
403 return (0);
404}
405
406/*
407 * unlock a pipe I/O lock
408 */
409static __inline void
410pipeunlock(cpipe)
411 struct pipe *cpipe;
412{
413
414 PIPE_LOCK_ASSERT(cpipe, MA_OWNED);
415 cpipe->pipe_state &= ~PIPE_LOCKFL;
416 if (cpipe->pipe_state & PIPE_LWANT) {
417 cpipe->pipe_state &= ~PIPE_LWANT;
418 wakeup(cpipe);
419 }
420}
421
422static __inline void
423pipeselwakeup(cpipe)
424 struct pipe *cpipe;
425{
426
427 if (cpipe->pipe_state & PIPE_SEL) {
428 cpipe->pipe_state &= ~PIPE_SEL;
429 selwakeup(&cpipe->pipe_sel);
430 }
431 if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio)
432 pgsigio(cpipe->pipe_sigio, SIGIO, 0);
433 KNOTE(&cpipe->pipe_sel.si_note, 0);
434}
435
436/* ARGSUSED */
437static int
438pipe_read(fp, uio, cred, flags, td)
439 struct file *fp;
440 struct uio *uio;
441 struct ucred *cred;
442 struct thread *td;
443 int flags;
444{
445 struct pipe *rpipe = (struct pipe *) fp->f_data;
446 int error;
447 int nread = 0;
448 u_int size;
449
450 PIPE_LOCK(rpipe);
451 ++rpipe->pipe_busy;
452 error = pipelock(rpipe, 1);
453 if (error)
454 goto unlocked_error;
455
456 while (uio->uio_resid) {
457 /*
458 * normal pipe buffer receive
459 */
460 if (rpipe->pipe_buffer.cnt > 0) {
461 size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
462 if (size > rpipe->pipe_buffer.cnt)
463 size = rpipe->pipe_buffer.cnt;
464 if (size > (u_int) uio->uio_resid)
465 size = (u_int) uio->uio_resid;
466
467 PIPE_UNLOCK(rpipe);
468 error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
469 size, uio);
470 PIPE_LOCK(rpipe);
471 if (error)
472 break;
473
474 rpipe->pipe_buffer.out += size;
475 if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
476 rpipe->pipe_buffer.out = 0;
477
478 rpipe->pipe_buffer.cnt -= size;
479
480 /*
481 * If there is no more to read in the pipe, reset
482 * its pointers to the beginning. This improves
483 * cache hit stats.
484 */
485 if (rpipe->pipe_buffer.cnt == 0) {
486 rpipe->pipe_buffer.in = 0;
487 rpipe->pipe_buffer.out = 0;
488 }
489 nread += size;
490#ifndef PIPE_NODIRECT
491 /*
492 * Direct copy, bypassing a kernel buffer.
493 */
494 } else if ((size = rpipe->pipe_map.cnt) &&
495 (rpipe->pipe_state & PIPE_DIRECTW)) {
496 caddr_t va;
497 if (size > (u_int) uio->uio_resid)
498 size = (u_int) uio->uio_resid;
499
500 va = (caddr_t) rpipe->pipe_map.kva +
501 rpipe->pipe_map.pos;
502 PIPE_UNLOCK(rpipe);
503 error = uiomove(va, size, uio);
504 PIPE_LOCK(rpipe);
505 if (error)
506 break;
507 nread += size;
508 rpipe->pipe_map.pos += size;
509 rpipe->pipe_map.cnt -= size;
510 if (rpipe->pipe_map.cnt == 0) {
511 rpipe->pipe_state &= ~PIPE_DIRECTW;
512 wakeup(rpipe);
513 }
514#endif
515 } else {
516 /*
517 * detect EOF condition
518 * read returns 0 on EOF, no need to set error
519 */
520 if (rpipe->pipe_state & PIPE_EOF)
521 break;
522
523 /*
524 * If the "write-side" has been blocked, wake it up now.
525 */
526 if (rpipe->pipe_state & PIPE_WANTW) {
527 rpipe->pipe_state &= ~PIPE_WANTW;
528 wakeup(rpipe);
529 }
530
531 /*
532 * Break if some data was read.
533 */
534 if (nread > 0)
535 break;
536
537 /*
538 * Unlock the pipe buffer for our remaining processing. We
539 * will either break out with an error or we will sleep and
540 * relock to loop.
541 */
542 pipeunlock(rpipe);
543
544 /*
545 * Handle non-blocking mode operation or
546 * wait for more data.
547 */
548 if (fp->f_flag & FNONBLOCK) {
549 error = EAGAIN;
550 } else {
551 rpipe->pipe_state |= PIPE_WANTR;
552 if ((error = msleep(rpipe, PIPE_MTX(rpipe),
553 PRIBIO | PCATCH,
554 "piperd", 0)) == 0)
555 error = pipelock(rpipe, 1);
556 }
557 if (error)
558 goto unlocked_error;
559 }
560 }
561 pipeunlock(rpipe);
562
563 /* XXX: should probably do this before getting any locks. */
564 if (error == 0)
565 vfs_timestamp(&rpipe->pipe_atime);
566unlocked_error:
567 --rpipe->pipe_busy;
568
569 /*
570 * PIPE_WANT processing only makes sense if pipe_busy is 0.
571 */
572 if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
573 rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
574 wakeup(rpipe);
575 } else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
576 /*
577 * Handle write blocking hysteresis.
578 */
579 if (rpipe->pipe_state & PIPE_WANTW) {
580 rpipe->pipe_state &= ~PIPE_WANTW;
581 wakeup(rpipe);
582 }
583 }
584
585 if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
586 pipeselwakeup(rpipe);
587
588 PIPE_UNLOCK(rpipe);
589 return (error);
590}
591
592#ifndef PIPE_NODIRECT
593/*
594 * Map the sending processes' buffer into kernel space and wire it.
595 * This is similar to a physical write operation.
596 */
597static int
598pipe_build_write_buffer(wpipe, uio)
599 struct pipe *wpipe;
600 struct uio *uio;
601{
602 u_int size;
603 int i;
604 vm_offset_t addr, endaddr, paddr;
605
606 GIANT_REQUIRED;
607 PIPE_LOCK_ASSERT(wpipe, MA_NOTOWNED);
608
609 size = (u_int) uio->uio_iov->iov_len;
610 if (size > wpipe->pipe_buffer.size)
611 size = wpipe->pipe_buffer.size;
612
613 endaddr = round_page((vm_offset_t)uio->uio_iov->iov_base + size);
614 addr = trunc_page((vm_offset_t)uio->uio_iov->iov_base);
615 for (i = 0; addr < endaddr; addr += PAGE_SIZE, i++) {
616 vm_page_t m;
617
618 if (vm_fault_quick((caddr_t)addr, VM_PROT_READ) < 0 ||
619 (paddr = pmap_kextract(addr)) == 0) {
620 int j;
621
622 for (j = 0; j < i; j++)
623 vm_page_unwire(wpipe->pipe_map.ms[j], 1);
624 return (EFAULT);
625 }
626
627 m = PHYS_TO_VM_PAGE(paddr);
628 vm_page_wire(m);
629 wpipe->pipe_map.ms[i] = m;
630 }
631
632/*
633 * set up the control block
634 */
635 wpipe->pipe_map.npages = i;
636 wpipe->pipe_map.pos =
637 ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
638 wpipe->pipe_map.cnt = size;
639
640/*
641 * and map the buffer
642 */
643 if (wpipe->pipe_map.kva == 0) {
644 /*
645 * We need to allocate space for an extra page because the
646 * address range might (will) span pages at times.
647 */
648 wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
649 wpipe->pipe_buffer.size + PAGE_SIZE);
650 amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE;
651 }
652 pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
653 wpipe->pipe_map.npages);
654
655/*
656 * and update the uio data
657 */
658
659 uio->uio_iov->iov_len -= size;
660 uio->uio_iov->iov_base += size;
661 if (uio->uio_iov->iov_len == 0)
662 uio->uio_iov++;
663 uio->uio_resid -= size;
664 uio->uio_offset += size;
665 return (0);
666}
667
668/*
669 * unmap and unwire the process buffer
670 */
671static void
672pipe_destroy_write_buffer(wpipe)
673 struct pipe *wpipe;
674{
675 int i;
676
677 GIANT_REQUIRED;
678 PIPE_LOCK_ASSERT(wpipe, MA_NOTOWNED);
679
680 if (wpipe->pipe_map.kva) {
681 pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
682
683 if (amountpipekva > MAXPIPEKVA) {
684 vm_offset_t kva = wpipe->pipe_map.kva;
685 wpipe->pipe_map.kva = 0;
686 kmem_free(kernel_map, kva,
687 wpipe->pipe_buffer.size + PAGE_SIZE);
688 amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
689 }
690 }
691 for (i = 0; i < wpipe->pipe_map.npages; i++)
692 vm_page_unwire(wpipe->pipe_map.ms[i], 1);
693 wpipe->pipe_map.npages = 0;
694}
695
696/*
697 * In the case of a signal, the writing process might go away. This
698 * code copies the data into the circular buffer so that the source
699 * pages can be freed without loss of data.
700 */
701static void
702pipe_clone_write_buffer(wpipe)
703 struct pipe *wpipe;
704{
705 int size;
706 int pos;
707
708 PIPE_LOCK_ASSERT(wpipe, MA_OWNED);
709 size = wpipe->pipe_map.cnt;
710 pos = wpipe->pipe_map.pos;
711 bcopy((caddr_t) wpipe->pipe_map.kva + pos,
712 (caddr_t) wpipe->pipe_buffer.buffer, size);
713
714 wpipe->pipe_buffer.in = size;
715 wpipe->pipe_buffer.out = 0;
716 wpipe->pipe_buffer.cnt = size;
717 wpipe->pipe_state &= ~PIPE_DIRECTW;
718
719 PIPE_GET_GIANT(wpipe);
720 pipe_destroy_write_buffer(wpipe);
721 PIPE_DROP_GIANT(wpipe);
722}
723
724/*
725 * This implements the pipe buffer write mechanism. Note that only
726 * a direct write OR a normal pipe write can be pending at any given time.
727 * If there are any characters in the pipe buffer, the direct write will
728 * be deferred until the receiving process grabs all of the bytes from
729 * the pipe buffer. Then the direct mapping write is set-up.
730 */
731static int
732pipe_direct_write(wpipe, uio)
733 struct pipe *wpipe;
734 struct uio *uio;
735{
736 int error;
737
738retry:
739 PIPE_LOCK_ASSERT(wpipe, MA_OWNED);
740 while (wpipe->pipe_state & PIPE_DIRECTW) {
741 if (wpipe->pipe_state & PIPE_WANTR) {
742 wpipe->pipe_state &= ~PIPE_WANTR;
743 wakeup(wpipe);
744 }
745 wpipe->pipe_state |= PIPE_WANTW;
746 error = msleep(wpipe, PIPE_MTX(wpipe),
747 PRIBIO | PCATCH, "pipdww", 0);
748 if (error)
749 goto error1;
750 if (wpipe->pipe_state & PIPE_EOF) {
751 error = EPIPE;
752 goto error1;
753 }
754 }
755 wpipe->pipe_map.cnt = 0; /* transfer not ready yet */
756 if (wpipe->pipe_buffer.cnt > 0) {
757 if (wpipe->pipe_state & PIPE_WANTR) {
758 wpipe->pipe_state &= ~PIPE_WANTR;
759 wakeup(wpipe);
760 }
761
762 wpipe->pipe_state |= PIPE_WANTW;
763 error = msleep(wpipe, PIPE_MTX(wpipe),
764 PRIBIO | PCATCH, "pipdwc", 0);
765 if (error)
766 goto error1;
767 if (wpipe->pipe_state & PIPE_EOF) {
768 error = EPIPE;
769 goto error1;
770 }
771 goto retry;
772 }
773
774 wpipe->pipe_state |= PIPE_DIRECTW;
775
776 pipelock(wpipe, 0);
777 PIPE_GET_GIANT(wpipe);
778 error = pipe_build_write_buffer(wpipe, uio);
779 PIPE_DROP_GIANT(wpipe);
780 pipeunlock(wpipe);
781 if (error) {
782 wpipe->pipe_state &= ~PIPE_DIRECTW;
783 goto error1;
784 }
785
786 error = 0;
787 while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
788 if (wpipe->pipe_state & PIPE_EOF) {
789 pipelock(wpipe, 0);
790 PIPE_GET_GIANT(wpipe);
791 pipe_destroy_write_buffer(wpipe);
792 PIPE_DROP_GIANT(wpipe);
793 pipeunlock(wpipe);
794 pipeselwakeup(wpipe);
795 error = EPIPE;
796 goto error1;
797 }
798 if (wpipe->pipe_state & PIPE_WANTR) {
799 wpipe->pipe_state &= ~PIPE_WANTR;
800 wakeup(wpipe);
801 }
802 pipeselwakeup(wpipe);
803 error = msleep(wpipe, PIPE_MTX(wpipe), PRIBIO | PCATCH,
804 "pipdwt", 0);
805 }
806
807 pipelock(wpipe,0);
808 if (wpipe->pipe_state & PIPE_DIRECTW) {
809 /*
810 * this bit of trickery substitutes a kernel buffer for
811 * the process that might be going away.
812 */
813 pipe_clone_write_buffer(wpipe);
814 } else {
815 PIPE_GET_GIANT(wpipe);
816 pipe_destroy_write_buffer(wpipe);
817 PIPE_DROP_GIANT(wpipe);
818 }
819 pipeunlock(wpipe);
820 return (error);
821
822error1:
823 wakeup(wpipe);
824 return (error);
825}
826#endif
827
828static int
829pipe_write(fp, uio, cred, flags, td)
830 struct file *fp;
831 struct uio *uio;
832 struct ucred *cred;
833 struct thread *td;
834 int flags;
835{
836 int error = 0;
837 int orig_resid;
838 struct pipe *wpipe, *rpipe;
839
840 rpipe = (struct pipe *) fp->f_data;
841 wpipe = rpipe->pipe_peer;
842
843 PIPE_LOCK(rpipe);
844 /*
845 * detect loss of pipe read side, issue SIGPIPE if lost.
846 */
847 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
848 PIPE_UNLOCK(rpipe);
849 return (EPIPE);
850 }
851 ++wpipe->pipe_busy;
852
853 /*
854 * If it is advantageous to resize the pipe buffer, do
855 * so.
856 */
857 if ((uio->uio_resid > PIPE_SIZE) &&
858 (nbigpipe < LIMITBIGPIPES) &&
859 (wpipe->pipe_state & PIPE_DIRECTW) == 0 &&
860 (wpipe->pipe_buffer.size <= PIPE_SIZE) &&
861 (wpipe->pipe_buffer.cnt == 0)) {
862
863 if ((error = pipelock(wpipe,1)) == 0) {
864 PIPE_GET_GIANT(wpipe);
865 if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
866 nbigpipe++;
867 PIPE_DROP_GIANT(wpipe);
868 pipeunlock(wpipe);
869 }
870 }
871
872 /*
873 * If an early error occured unbusy and return, waking up any pending
874 * readers.
875 */
876 if (error) {
877 --wpipe->pipe_busy;
878 if ((wpipe->pipe_busy == 0) &&
879 (wpipe->pipe_state & PIPE_WANT)) {
880 wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
881 wakeup(wpipe);
882 }
883 PIPE_UNLOCK(rpipe);
884 return(error);
885 }
886
887 KASSERT(wpipe->pipe_buffer.buffer != NULL, ("pipe buffer gone"));
888
889 orig_resid = uio->uio_resid;
890
891 while (uio->uio_resid) {
892 int space;
893
894#ifndef PIPE_NODIRECT
895 /*
896 * If the transfer is large, we can gain performance if
897 * we do process-to-process copies directly.
898 * If the write is non-blocking, we don't use the
899 * direct write mechanism.
900 *
901 * The direct write mechanism will detect the reader going
902 * away on us.
903 */
904 if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) &&
905 (fp->f_flag & FNONBLOCK) == 0 &&
906 (wpipe->pipe_map.kva || (amountpipekva < LIMITPIPEKVA)) &&
907 (uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
908 error = pipe_direct_write( wpipe, uio);
909 if (error)
910 break;
911 continue;
912 }
913#endif
914
915 /*
916 * Pipe buffered writes cannot be coincidental with
917 * direct writes. We wait until the currently executing
918 * direct write is completed before we start filling the
919 * pipe buffer. We break out if a signal occurs or the
920 * reader goes away.
921 */
922 retrywrite:
923 while (wpipe->pipe_state & PIPE_DIRECTW) {
924 if (wpipe->pipe_state & PIPE_WANTR) {
925 wpipe->pipe_state &= ~PIPE_WANTR;
926 wakeup(wpipe);
927 }
928 error = msleep(wpipe, PIPE_MTX(rpipe), PRIBIO | PCATCH,
929 "pipbww", 0);
930 if (wpipe->pipe_state & PIPE_EOF)
931 break;
932 if (error)
933 break;
934 }
935 if (wpipe->pipe_state & PIPE_EOF) {
936 error = EPIPE;
937 break;
938 }
939
940 space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
941
942 /* Writes of size <= PIPE_BUF must be atomic. */
943 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
944 space = 0;
945
946 if (space > 0 && (wpipe->pipe_buffer.cnt < PIPE_SIZE)) {
947 if ((error = pipelock(wpipe,1)) == 0) {
948 int size; /* Transfer size */
949 int segsize; /* first segment to transfer */
950
951 /*
952 * It is possible for a direct write to
953 * slip in on us... handle it here...
954 */
955 if (wpipe->pipe_state & PIPE_DIRECTW) {
956 pipeunlock(wpipe);
957 goto retrywrite;
958 }
959 /*
960 * If a process blocked in uiomove, our
961 * value for space might be bad.
962 *
963 * XXX will we be ok if the reader has gone
964 * away here?
965 */
966 if (space > wpipe->pipe_buffer.size -
967 wpipe->pipe_buffer.cnt) {
968 pipeunlock(wpipe);
969 goto retrywrite;
970 }
971
972 /*
973 * Transfer size is minimum of uio transfer
974 * and free space in pipe buffer.
975 */
976 if (space > uio->uio_resid)
977 size = uio->uio_resid;
978 else
979 size = space;
980 /*
981 * First segment to transfer is minimum of
982 * transfer size and contiguous space in
983 * pipe buffer. If first segment to transfer
984 * is less than the transfer size, we've got
985 * a wraparound in the buffer.
986 */
987 segsize = wpipe->pipe_buffer.size -
988 wpipe->pipe_buffer.in;
989 if (segsize > size)
990 segsize = size;
991
992 /* Transfer first segment */
993
994 PIPE_UNLOCK(rpipe);
995 error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
996 segsize, uio);
997 PIPE_LOCK(rpipe);
998
999 if (error == 0 && segsize < size) {
1000 /*
1001 * Transfer remaining part now, to
1002 * support atomic writes. Wraparound
1003 * happened.
1004 */
1005 if (wpipe->pipe_buffer.in + segsize !=
1006 wpipe->pipe_buffer.size)
1007 panic("Expected pipe buffer wraparound disappeared");
1008
1009 PIPE_UNLOCK(rpipe);
1010 error = uiomove(&wpipe->pipe_buffer.buffer[0],
1011 size - segsize, uio);
1012 PIPE_LOCK(rpipe);
1013 }
1014 if (error == 0) {
1015 wpipe->pipe_buffer.in += size;
1016 if (wpipe->pipe_buffer.in >=
1017 wpipe->pipe_buffer.size) {
1018 if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size)
1019 panic("Expected wraparound bad");
1020 wpipe->pipe_buffer.in = size - segsize;
1021 }
1022
1023 wpipe->pipe_buffer.cnt += size;
1024 if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size)
1025 panic("Pipe buffer overflow");
1026
1027 }
1028 pipeunlock(wpipe);
1029 }
1030 if (error)
1031 break;
1032
1033 } else {
1034 /*
1035 * If the "read-side" has been blocked, wake it up now.
1036 */
1037 if (wpipe->pipe_state & PIPE_WANTR) {
1038 wpipe->pipe_state &= ~PIPE_WANTR;
1039 wakeup(wpipe);
1040 }
1041
1042 /*
1043 * don't block on non-blocking I/O
1044 */
1045 if (fp->f_flag & FNONBLOCK) {
1046 error = EAGAIN;
1047 break;
1048 }
1049
1050 /*
1051 * We have no more space and have something to offer,
1052 * wake up select/poll.
1053 */
1054 pipeselwakeup(wpipe);
1055
1056 wpipe->pipe_state |= PIPE_WANTW;
1057 error = msleep(wpipe, PIPE_MTX(rpipe),
1058 PRIBIO | PCATCH, "pipewr", 0);
1059 if (error != 0)
1060 break;
1061 /*
1062 * If read side wants to go away, we just issue a signal
1063 * to ourselves.
1064 */
1065 if (wpipe->pipe_state & PIPE_EOF) {
1066 error = EPIPE;
1067 break;
1068 }
1069 }
1070 }
1071
1072 --wpipe->pipe_busy;
1073
1074 if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) {
1075 wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
1076 wakeup(wpipe);
1077 } else if (wpipe->pipe_buffer.cnt > 0) {
1078 /*
1079 * If we have put any characters in the buffer, we wake up
1080 * the reader.
1081 */
1082 if (wpipe->pipe_state & PIPE_WANTR) {
1083 wpipe->pipe_state &= ~PIPE_WANTR;
1084 wakeup(wpipe);
1085 }
1086 }
1087
1088 /*
1089 * Don't return EPIPE if I/O was successful
1090 */
1091 if ((wpipe->pipe_buffer.cnt == 0) &&
1092 (uio->uio_resid == 0) &&
1093 (error == EPIPE)) {
1094 error = 0;
1095 }
1096
1097 if (error == 0)
1098 vfs_timestamp(&wpipe->pipe_mtime);
1099
1100 /*
1101 * We have something to offer,
1102 * wake up select/poll.
1103 */
1104 if (wpipe->pipe_buffer.cnt)
1105 pipeselwakeup(wpipe);
1106
1107 PIPE_UNLOCK(rpipe);
1108 return (error);
1109}
1110
1111/*
1112 * we implement a very minimal set of ioctls for compatibility with sockets.
1113 */
1114int
1115pipe_ioctl(fp, cmd, data, td)
1116 struct file *fp;
1117 u_long cmd;
1118 caddr_t data;
1119 struct thread *td;
1120{
1121 struct pipe *mpipe = (struct pipe *)fp->f_data;
1122
1123 switch (cmd) {
1124
1125 case FIONBIO:
1126 return (0);
1127
1128 case FIOASYNC:
1129 PIPE_LOCK(mpipe);
1130 if (*(int *)data) {
1131 mpipe->pipe_state |= PIPE_ASYNC;
1132 } else {
1133 mpipe->pipe_state &= ~PIPE_ASYNC;
1134 }
1135 PIPE_UNLOCK(mpipe);
1136 return (0);
1137
1138 case FIONREAD:
1139 PIPE_LOCK(mpipe);
1140 if (mpipe->pipe_state & PIPE_DIRECTW)
1141 *(int *)data = mpipe->pipe_map.cnt;
1142 else
1143 *(int *)data = mpipe->pipe_buffer.cnt;
1144 PIPE_UNLOCK(mpipe);
1145 return (0);
1146
1147 case FIOSETOWN:
1148 return (fsetown(*(int *)data, &mpipe->pipe_sigio));
1149
1150 case FIOGETOWN:
1151 *(int *)data = fgetown(mpipe->pipe_sigio);
1152 return (0);
1153
1154 /* This is deprecated, FIOSETOWN should be used instead. */
1155 case TIOCSPGRP:
1156 return (fsetown(-(*(int *)data), &mpipe->pipe_sigio));
1157
1158 /* This is deprecated, FIOGETOWN should be used instead. */
1159 case TIOCGPGRP:
1160 *(int *)data = -fgetown(mpipe->pipe_sigio);
1161 return (0);
1162
1163 }
1164 return (ENOTTY);
1165}
1166
1167int
1168pipe_poll(fp, events, cred, td)
1169 struct file *fp;
1170 int events;
1171 struct ucred *cred;
1172 struct thread *td;
1173{
1174 struct pipe *rpipe = (struct pipe *)fp->f_data;
1175 struct pipe *wpipe;
1176 int revents = 0;
1177
1178 wpipe = rpipe->pipe_peer;
1179 PIPE_LOCK(rpipe);
1180 if (events & (POLLIN | POLLRDNORM))
1181 if ((rpipe->pipe_state & PIPE_DIRECTW) ||
1182 (rpipe->pipe_buffer.cnt > 0) ||
1183 (rpipe->pipe_state & PIPE_EOF))
1184 revents |= events & (POLLIN | POLLRDNORM);
1185
1186 if (events & (POLLOUT | POLLWRNORM))
1187 if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF) ||
1188 (((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
1189 (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF))
1190 revents |= events & (POLLOUT | POLLWRNORM);
1191
1192 if ((rpipe->pipe_state & PIPE_EOF) ||
1193 (wpipe == NULL) ||
1194 (wpipe->pipe_state & PIPE_EOF))
1195 revents |= POLLHUP;
1196
1197 if (revents == 0) {
1198 if (events & (POLLIN | POLLRDNORM)) {
1199 selrecord(td, &rpipe->pipe_sel);
1200 rpipe->pipe_state |= PIPE_SEL;
1201 }
1202
1203 if (events & (POLLOUT | POLLWRNORM)) {
1204 selrecord(td, &wpipe->pipe_sel);
1205 wpipe->pipe_state |= PIPE_SEL;
1206 }
1207 }
1208 PIPE_UNLOCK(rpipe);
1209
1210 return (revents);
1211}
1212
1213static int
1214pipe_stat(fp, ub, td)
1215 struct file *fp;
1216 struct stat *ub;
1217 struct thread *td;
1218{
1219 struct pipe *pipe = (struct pipe *)fp->f_data;
1220
1221 bzero((caddr_t)ub, sizeof(*ub));
1222 ub->st_mode = S_IFIFO;
1223 ub->st_blksize = pipe->pipe_buffer.size;
1224 ub->st_size = pipe->pipe_buffer.cnt;
1225 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1226 ub->st_atimespec = pipe->pipe_atime;
1227 ub->st_mtimespec = pipe->pipe_mtime;
1228 ub->st_ctimespec = pipe->pipe_ctime;
1229 ub->st_uid = fp->f_cred->cr_uid;
1230 ub->st_gid = fp->f_cred->cr_gid;
1231 /*
1232 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen.
1233 * XXX (st_dev, st_ino) should be unique.
1234 */
1235 return (0);
1236}
1237
1238/* ARGSUSED */
1239static int
1240pipe_close(fp, td)
1241 struct file *fp;
1242 struct thread *td;
1243{
1244 struct pipe *cpipe = (struct pipe *)fp->f_data;
1245
1246 fp->f_ops = &badfileops;
1247 fp->f_data = NULL;
1248 funsetown(cpipe->pipe_sigio);
1249 pipeclose(cpipe);
1250 return (0);
1251}
1252
1253static void
1254pipe_free_kmem(cpipe)
1255 struct pipe *cpipe;
1256{
1257
1258 GIANT_REQUIRED;
1259 KASSERT(cpipe->pipe_mtxp == NULL || !mtx_owned(PIPE_MTX(cpipe)),
1260 ("pipespace: pipe mutex locked"));
1261
1262 if (cpipe->pipe_buffer.buffer != NULL) {
1263 if (cpipe->pipe_buffer.size > PIPE_SIZE)
1264 --nbigpipe;
1265 amountpipekva -= cpipe->pipe_buffer.size;
1266 kmem_free(kernel_map,
1267 (vm_offset_t)cpipe->pipe_buffer.buffer,
1268 cpipe->pipe_buffer.size);
1269 cpipe->pipe_buffer.buffer = NULL;
1270 }
1271#ifndef PIPE_NODIRECT
1272 if (cpipe->pipe_map.kva != NULL) {
1273 amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE;
1274 kmem_free(kernel_map,
1275 cpipe->pipe_map.kva,
1276 cpipe->pipe_buffer.size + PAGE_SIZE);
1277 cpipe->pipe_map.cnt = 0;
1278 cpipe->pipe_map.kva = 0;
1279 cpipe->pipe_map.pos = 0;
1280 cpipe->pipe_map.npages = 0;
1281 }
1282#endif
1283}
1284
1285/*
1286 * shutdown the pipe
1287 */
1288static void
1289pipeclose(cpipe)
1290 struct pipe *cpipe;
1291{
1292 struct pipe *ppipe;
1293 int hadpeer;
1294
1295 if (cpipe == NULL)
1296 return;
1297
1298 hadpeer = 0;
1299
1300 /* partially created pipes won't have a valid mutex. */
1301 if (PIPE_MTX(cpipe) != NULL)
1302 PIPE_LOCK(cpipe);
1303
1304 pipeselwakeup(cpipe);
1305
1306 /*
1307 * If the other side is blocked, wake it up saying that
1308 * we want to close it down.
1309 */
1310 while (cpipe->pipe_busy) {
1311 wakeup(cpipe);
1312 cpipe->pipe_state |= PIPE_WANT | PIPE_EOF;
1313 msleep(cpipe, PIPE_MTX(cpipe), PRIBIO, "pipecl", 0);
1314 }
1315
1316 /*
1317 * Disconnect from peer
1318 */
1319 if ((ppipe = cpipe->pipe_peer) != NULL) {
1320 hadpeer++;
1321 pipeselwakeup(ppipe);
1322
1323 ppipe->pipe_state |= PIPE_EOF;
1324 wakeup(ppipe);
1325 KNOTE(&ppipe->pipe_sel.si_note, 0);
1326 ppipe->pipe_peer = NULL;
1327 }
1328 /*
1329 * free resources
1330 */
1331 if (PIPE_MTX(cpipe) != NULL) {
1332 PIPE_UNLOCK(cpipe);
1333 if (!hadpeer) {
1334 mtx_destroy(PIPE_MTX(cpipe));
1335 free(PIPE_MTX(cpipe), M_TEMP);
1336 }
1337 }
1338 mtx_lock(&Giant);
1339 pipe_free_kmem(cpipe);
1340 zfree(pipe_zone, cpipe);
1341 mtx_unlock(&Giant);
1342}
1343
1344/*ARGSUSED*/
1345static int
1346pipe_kqfilter(struct file *fp, struct knote *kn)
1347{
1348 struct pipe *cpipe;
1349
1350 cpipe = (struct pipe *)kn->kn_fp->f_data;
1351 switch (kn->kn_filter) {
1352 case EVFILT_READ:
1353 kn->kn_fop = &pipe_rfiltops;
1354 break;
1355 case EVFILT_WRITE:
1356 kn->kn_fop = &pipe_wfiltops;
1357 cpipe = cpipe->pipe_peer;
1358 break;
1359 default:
1360 return (1);
1361 }
1362 kn->kn_hook = (caddr_t)cpipe;
1363
1364 PIPE_LOCK(cpipe);
1365 SLIST_INSERT_HEAD(&cpipe->pipe_sel.si_note, kn, kn_selnext);
1366 PIPE_UNLOCK(cpipe);
1367 return (0);
1368}
1369
1370static void
1371filt_pipedetach(struct knote *kn)
1372{
1373 struct pipe *cpipe = (struct pipe *)kn->kn_hook;
1374
1375 PIPE_LOCK(cpipe);
1376 SLIST_REMOVE(&cpipe->pipe_sel.si_note, kn, knote, kn_selnext);
1377 PIPE_UNLOCK(cpipe);
1378}
1379
1380/*ARGSUSED*/
1381static int
1382filt_piperead(struct knote *kn, long hint)
1383{
1384 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1385 struct pipe *wpipe = rpipe->pipe_peer;
1386
1387 PIPE_LOCK(rpipe);
1388 kn->kn_data = rpipe->pipe_buffer.cnt;
1389 if ((kn->kn_data == 0) && (rpipe->pipe_state & PIPE_DIRECTW))
1390 kn->kn_data = rpipe->pipe_map.cnt;
1391
1392 if ((rpipe->pipe_state & PIPE_EOF) ||
1393 (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1394 kn->kn_flags |= EV_EOF;
1395 PIPE_UNLOCK(rpipe);
1396 return (1);
1397 }
1398 PIPE_UNLOCK(rpipe);
1399 return (kn->kn_data > 0);
1400}
1401
1402/*ARGSUSED*/
1403static int
1404filt_pipewrite(struct knote *kn, long hint)
1405{
1406 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1407 struct pipe *wpipe = rpipe->pipe_peer;
1408
1409 PIPE_LOCK(rpipe);
1410 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1411 kn->kn_data = 0;
1412 kn->kn_flags |= EV_EOF;
1413 PIPE_UNLOCK(rpipe);
1414 return (1);
1415 }
1416 kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
1417 if (wpipe->pipe_state & PIPE_DIRECTW)
1418 kn->kn_data = 0;
1419
1420 PIPE_UNLOCK(rpipe);
1421 return (kn->kn_data >= PIPE_BUF);
1422}