1/* BEGIN LICENSE BLOCK
2 * Version: CMPL 1.1
3 *
4 * The contents of this file are subject to the Cisco-style Mozilla Public
5 * License Version 1.1 (the "License"); you may not use this file except
6 * in compliance with the License.  You may obtain a copy of the License
7 * at www.eclipse-clp.org/license.
8 *
9 * Software distributed under the License is distributed on an "AS IS"
10 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied.  See
11 * the License for the specific language governing rights and limitations
12 * under the License.
13 *
14 * The Original Code is  The ECLiPSe Constraint Logic Programming System.
15 * The Initial Developer of the Original Code is  Cisco Systems, Inc.
16 * Portions created by the Initial Developer are
17 * Copyright (C) 1989-2006 Cisco Systems, Inc.  All Rights Reserved.
18 *
19 * Contributor(s):
20 *
21 * END LICENSE BLOCK */
22
23/*
24 * VERSION	$Id: io.c,v 1.19 2013/03/11 01:32:03 jschimpf Exp $
25 */
26
27/*
28 * IDENTIFICATION               io.c
29 *
30 * DESCRIPTION			Primitives for the I/O.
31 *
32 * CONTENTS:
33 *
34 * AUTHOR       VERSION  DATE   REASON
35 * Pierre Dufresne
36 * Micha Meier			Complete change - output buffering, handling
37 *				the terminal, prompts, system streams,
38 *				portability, the 'user' stream, raw mode,
39 *				interrupt fixes
40 *
41 * ----------------------------------------------------------------------
42 *
43 * General:
44 *
45 *   I/O Buffers:
46 *    -	Apart from null stream, all streams have a buffer
47 *    -	string and queue streams have lists of buffers and switch between them
48 *    -	buffers have LOOKAHEAD extra bytes in front (for unget/1 and lexer)
49 *    -	buffers have 1 extra byte after (for EOB_MARK, used in lexer)
50 *
51 *   StreamBuf(nst)	points to a buffer
52 *   StreamPtr(nst)	points into that buffer (read[/write] position)
53 *   StreamSize(nst)	is that buffer's size
54 *   StreamCnt(nst)	is the number of used bytes in that buffer
55 *
56 *   StreamWBuf(nst)	is used for queue streams only because they need to
57 *			maintain separate read and write positions.
58 *   StreamOffset 	offset of the current buffer from beginning of stream
59 *			(except for queue streams)
60 *
61 * ----------------------------------------------------------------------
62 *
63 * File/Tty streams:
64 *
65 *   They have a single buffer, used as follows:
66 *
67 *   Read streams (or RDWR, not MWRITE):
68 *
69 *	 already_read not_yet_read dont_care
70 *	 +------------+------------+---------+
71 *	buf          ptr        buf+cnt   buf+size
72 *
73 *
74 *
75 *   Write streams (or RDWR, not MREAD):
76 *
77 *	 not_yet_flushed dont_care
78 *	 +---------------+-------------------+
79 *	buf             ptr               buf+size
80 *
81 *   After seeking back:
82 *
83 *	 not_yet_flushed not_flushed
84 *	 +---------------+-----------+-------+
85 *	buf             ptr       buf+cnt buf+size
86 *
87 *
88 *
89 *   Updated stream (RDWR, MREAD, MWRITE):
90 *
91 *	 not_yet_flushed not_flushed
92 *	 +---------------+-----------+-------+
93 *	buf             ptr       buf+cnt buf+size
94 *
95 *	Ptr is used both for reading and writing.
96 *	When writing, ptr may go beyond buf+cnt.
97 *
98 *
99 *  - MWRITE means the buffer is dirty (was written into and not yet flushed)
100 *    MWRITE implies SWRITE or SRDWR.
101 *  - Offset is the offset of the beginning of the buffer from the
102 *    beginning of the file.
103 *  - MREAD means the buffer was read. This is only relevant for RDWR streams
104 *    and basically means that the fd is not positioned at offset.
105 *    MREAD implies SREAD or SRDWR.
106 *  - MEOF means that end_of_file was already read once, and the next attempt
107 *    should raise an error.
108 *
109 * ----------------------------------------------------------------------
110 *
111 * String streams:
112 *
113 *   String streams have a doubly linked list of buffers, all the same size,
114 *   which together hold the contents of the string stream. read/write/seek
115 *   can switch back and forth through this buffer list.
116 *   One of the buffers is always active (pointed to by StreamBuf) and used
117 *   similar to the single buffer in the case of true files.
118 *
119 *	 xxxxxxxxxxxxxxxxxxxxxxxx dont_care
120 *	 +---------------+--------+----------+
121 *	buf             ptr    buf+cnt    buf+size
122 *
123 *	Ptr is used both for reading and writing.
124 *	Buf+cnt indicates the end of the used buffer and is always >= ptr.
125 *	If the buffer is the last in the chain, Buf+cnt is the end of "file".
126 *
127 *  MREAD	always set, indicating the buffer content is valid
128 *  MWRITE	unused
129 *  StreamBuf	current buffer
130 *  StreamPtr	points into current buffer (read/write position)
131 *  StreamCnt	every buffer in the list has its own cnt field, which is
132 *		copied to StreamCnt when the buffer is/becomes current.
133 *  StreamWBuf	unused
134 *
135 * ----------------------------------------------------------------------
136 *
137 * Queue streams:
138 *
139 *   Queue streams have a doubly linked circular list of buffers, all the
140 *   same size. Read and write position are separate. The read buffer is
141 *   similar to the single buffer in the case of true files:
142 *
143 *	 dont_care    not_yet_read dont_care
144 *	 +------------+------------+---------+
145 *	buf          ptr        buf+cnt   buf+size
146 *
147 *	Ptr is used for reading only, writing is done at buf+cnt.
148 *
149 *   The write buffer is pointed to by StreamWBuf. The write position in
150 *   this buffer is StreamWBuf + BufHeader(StreamWBuf)->cnt.
151 *   StreamBuf and StreamWBuf are either identical (in which case the read
152 *   position is always below the write position), or StreamWBuf is just
153 *   before StreamBuf in the circular list. The write pointer is not allowed
154 *   to enter the read buffer from the left, instead, a fresh buffer gets
155 *   inserted between StreamWBuf and StreamBuf when needed during writing.
156 *   Buffers that become empty during reading are eagerly unlinked and freed.
157 *
158 *  MREAD	always set, indicating the buffer content is valid
159 *  MWRITE	queue was written into, but not yet flushed
160 *  StreamBuf	current (read) buffer
161 *  StreamCnt	every buffer in the list has its own cnt field, which is copied
162 *		to StreamCnt when the buffer is/becomes current (read) buffer.
163 *  StreamWBuf	current write buffer
164 *  StreamOffset is the sum of counts of full buffers between read and
165 *		write buffer (to speed up size computation).
166 *
167 * ----------------------------------------------------------------------
168 */
169
170
171/*
172 * INCLUDES:
173 */
174#include "config.h"
175#include "os_support.h"
176
177#include <stdio.h>	/* for sprintf, readline */
178#include <errno.h>	/* for EMFILE */
179#include <sys/types.h>
180#include <sys/stat.h>
181#include <fcntl.h>
182
183#ifdef HAVE_UNISTD_H
184#include <unistd.h>
185#else
186#ifndef _WIN32
187extern long		lseek();
188#endif
189#endif
190
191#ifdef HAVE_STRING_H
192#include <string.h>
193#endif
194
195#ifdef _WIN32
196
197#define Termio	int
198
199#else
200
201#if defined(HAVE_TCGETATTR)
202#define TERMIO_POSIX_STYLE
203#include <termios.h>
204#define Termio		struct termios
205#endif
206
207#if defined(TERMIO_SYS_V_STYLE)
208#include <termio.h>
209#define Termio		struct termio
210#define GetTermAttr	TCGETA
211#define SetTermAttr	TCSETA
212#else
213# if defined(HAVE_PUSHBACK)
214# include <termio.h>
215# endif
216#endif
217
218#if !defined(Termio)
219#define TERMIO_BSD_STYLE
220#include <sgtty.h>
221#include <sys/file.h>
222#include <sys/ioctl.h>
223#include <sys/time.h>
224#define Termio		struct sgttyb
225#define GetTermAttr	TIOCGETP
226#define SetTermAttr	TIOCSETN
227#endif
228
229#if defined(SIGIO_SETSIG)
230#include <stropts.h>
231#endif
232
233#if defined(SIGIO_FIOASYNC)
234#include <sys/ioctl.h>
235#endif
236
237#endif
238
239#include "sepia.h"
240#include "types.h"
241#include "embed.h"
242#include "mem.h"
243#include "error.h"
244#include "dict.h"
245#include "lex.h"
246#include "ec_io.h"
247#include "emu_export.h"
248#include "property.h"
249#include "module.h"
250
251/*
252 * DEFINES:
253 */
254
255#define STREAM_MIN	32
256#define STREAM_INC	16
257
258
259/*
260 * Set the stream number as a property of the atom's did. The counter for
261 * the previous stream, if any, is decremented, and the new one is incremented.
262 */
263#define Set_Stream(sdid, nst)						\
264	{								\
265		pword   *prop;						\
266		prop = get_property(sdid, STREAM_PROP);			\
267		if (prop == (pword *) NULL)				\
268		    prop = set_property(sdid, STREAM_PROP);		\
269		else							\
270		    stream_tid.free((stream_id) prop->val.wptr);	\
271		prop->tag.kernel = TPTR;				\
272		prop->val.wptr = (uword *) stream_tid.copy(nst);	\
273	}
274
275#define Set_New_Stream(did, nst)		\
276    (void) erase_property(did, STREAM_PROP); 	\
277    Set_Stream(did, nst)
278
279#define Check_Stream_Owner(nst)	\
280	if (StreamUnit(nst) != NO_UNIT && nst->fd_pid && nst->fd_pid != own_pid) \
281	{ Bip_Error(PERROR); }
282
283#define LocalStreams() (StreamDescriptors == stream_ids_)
284
285#define RoundTo(n,unit) ((n) - ((n) - 1) % (unit) -1 + (unit))
286
287/*
288 * Simple linear congruential random sequence generator used for scrambling
289 * streams. The constants are taken from B.Schneier, Applied Cryptography
290 */
291#define NextRand(r)	(((r)*4096 + 150889) % 714025)
292
293
294/*
295 * Macros for linked buffer chains (string streams and queues)
296 * The LOOKAHEAD bytes are being updated only when going forward one buffer,
297 * because only then the previous buffer may just have changed.
298 */
299
300#define StreamBufHeader(nst)	((linked_io_buffer_t*) StreamBuf(nst) - 1)
301#define StreamWBufHeader(nst)	((linked_io_buffer_t*) StreamWBuf(nst) - 1)
302#define BufHeader(buf)		((linked_io_buffer_t*) (buf) - 1)
303
304#define Advance_Buffer(nst) \
305	StreamBuf(nst) = StreamBufHeader(nst)->next; \
306	Copy_Bytes(StreamBuf(nst)-LOOKAHEAD, StreamPtr(nst)-LOOKAHEAD, LOOKAHEAD);  \
307	StreamCnt(nst) = StreamBufHeader(nst)->cnt; \
308	StreamPtr(nst) = StreamBuf(nst);
309
310#define Retreat_Buffer(nst) \
311	StreamBuf(nst) = StreamBufHeader(nst)->prev; \
312	StreamCnt(nst) = StreamBufHeader(nst)->cnt; \
313	StreamPtr(nst) = StreamBuf(nst);
314
315#define New_Buffer(nst, buf) { \
316	linked_io_buffer_t *buf_header = (linked_io_buffer_t *) hg_alloc(sizeof(linked_io_buffer_t) + StreamSize(nst) + 1); \
317	buf_header->next = buf_header->prev = 0; \
318	buf_header->cnt = 0; \
319	buf = (unsigned char *) (buf_header + 1); \
320	buf[0] = EOB_MARK; \
321}
322
323#define Append_New_Buffer(nst) { \
324	unsigned char *buf; \
325	New_Buffer(nst, buf); \
326	BufHeader(buf)->prev = StreamBuf(nst); \
327	StreamBufHeader(nst)->next = buf; \
328}
329
330#define Free_Prev_Buffer(nst) { \
331	unsigned char *empty_buf = BufHeader(StreamBuf(nst))->prev;  \
332	BufHeader(BufHeader(empty_buf)->prev)->next = StreamBuf(nst);  \
333	BufHeader(StreamBuf(nst))->prev = BufHeader(empty_buf)->prev;  \
334	hg_free((generic_ptr) BufHeader(empty_buf));  \
335}
336
337
338/*
339 * EXTERNAL VARIABLE DEFINITIONS:
340 */
341
342/*
343 * Stream  descriptors
344 */
345stream_id	stream_ids_[STREAM_MIN];
346stream_desc	stream_desc_structs_[STREAM_MIN];
347
348/*
349 * System streams. user is a conglomerate of current_input_ and current_output_
350 */
351
352stream_id	current_input_,		/* current streams */
353		current_output_,
354		current_err_,
355		log_output_,
356		warning_output_,
357
358		user_input_,		/* default streams */
359		user_output_,
360		user_err_,
361
362		null_;
363
364
365/* SICStus-like read hook */
366int (*E_read_hook)() = NULL;
367
368/*
369 * TYPES
370 */
371
372typedef struct linked_io_buffer {
373    unsigned char *		prev;
374    unsigned char *		next;
375    uword			cnt;
376    /* char			_lookahead[LOOKAHEAD];  but aligned: */
377    void_ptr			_lookahead[(LOOKAHEAD-1)/sizeof(void_ptr) + 1];
378} linked_io_buffer_t;
379
380
381io_channel_t
382	ec_file, ec_tty, ec_pipe, ec_socket, ec_queue_stream,
383	ec_string_stream, ec_null_stream;
384
385
386/*
387 * FUNCTION DEFINITIONS:
388 */
389
390static void	_free_stream(stream_id nst),
391		_init_fd_stream(stream_id nst, int unit, int mode, dident name, dident prompt, stream_id prompt_stream, int size);
392
393#if defined(HAVE_READLINE)
394static void	_resize_stream_buffer(stream_id nst, long newsize);
395#endif
396
397static int	_isafifo(int fd),
398		_local_io_close(stream_id nst),
399		_local_io_flush_out(stream_id nst),
400		_local_fill_buffer(stream_id nst),
401		_queue_fill_buffer(stream_id nst),
402		_string_fill_buffer(stream_id nst),
403		_local_tty_in(stream_id nst);
404
405
406
407/*
408 * FUNCTION NAME:	io_init()
409 *
410 * PARAMETERS:		NONE
411 *
412 * DESCRIPTION:		Initializes the system streams.
413 *
414 * This depends on the setting of ec_options.io_option:
415 *
416 * SHARED_IO	stream descriptors are in shared memory and actual I/O
417 *		 is done by the owner process of the file descriptor (default).
418 * OWN_IO	have streams in private memory so that every worker
419 *		 can have its private streams. This is mainly for
420 *		 debugging the parallel system.
421 * MEMORY_IO	Do not connect to stdin/stdout/stderr, but open all standard
422 *		 Eclipse streams as in-memory queues. This is for embedding.
423 */
424
425int
426io_init(int flags)
427{
428    int		i;
429
430    if ((ec_options.io_option == OWN_IO) && (flags & INIT_PRIVATE))
431    {
432	/*
433	 * Make a private array of stream descriptors
434	 */
435	StreamDescriptors = stream_ids_;
436	for (i = 0; i < STREAM_MIN; i++)
437	{
438	    StreamId(i) = &stream_desc_structs_[i];
439	    a_mutex_init(&StreamId(i)->lock);
440	    StreamMode(StreamId(i)) = SCLOSED;
441	    StreamNref(StreamId(i)) = 0;
442	    StreamNr(StreamId(i)) = i;
443	}
444	NbStreams = NbStreamsFree = STREAM_MIN;
445    }
446    if ((ec_options.io_option != OWN_IO) && (flags & INIT_SHARED))
447    {
448	/*
449	 * Make a shared array of stream descriptors
450	 */
451	StreamDescriptors = (stream_desc **)
452		hg_alloc_size(STREAM_MIN * sizeof(stream_desc *));
453	for (i = 0; i < STREAM_MIN; i++)
454	{
455	    StreamId(i) = (stream_desc *) hg_alloc_size(sizeof(stream_desc));
456	    a_mutex_init(&StreamId(i)->lock);
457	    StreamMode(StreamId(i)) = SCLOSED;
458	    StreamNref(StreamId(i)) = 0;
459	    StreamNr(StreamId(i)) = i;
460	}
461	NbStreams = NbStreamsFree = STREAM_MIN;
462    }
463    if (flags & INIT_PRIVATE)
464    {
465	/*
466	 * Initialize some private data
467	 */
468	current_input_ = user_input_ = StreamId(0);
469	current_output_ = warning_output_ = log_output_ = user_output_ = StreamId(1);
470	current_err_ = user_err_ = StreamId(2);
471	null_ = StreamId(3);
472    }
473    if (flags & INIT_SHARED)
474    {
475	Set_New_Stream(d_.input, current_input_);
476	Set_New_Stream(d_.output, current_output_);
477	Set_New_Stream(d_.err, current_err_);
478	Set_New_Stream(d_.warning_output, warning_output_);
479	Set_New_Stream(d_.log_output, log_output_);
480	Set_New_Stream(d_.null, null_);
481	Set_New_Stream(d_.stdin0, StreamId(0));
482	Set_New_Stream(d_.stdout0, StreamId(1));
483	Set_New_Stream(d_.stderr0, StreamId(2));
484	Set_New_Stream(d_.user_input, StreamId(0));
485	Set_New_Stream(d_.user_output, StreamId(1));
486	Set_New_Stream(d_.user_error, StreamId(2));
487    }
488
489    if ((ec_options.io_option != OWN_IO) && (flags & REINIT_SHARED))
490    {
491	/* The stream descriptors are in the form they were when the
492	 * save was done. The opened streams don't exist any more,
493	 * and the standard streams may be different,
494	 * therefore we set all open descriptors to closed.
495	 * Note that they can still have alias names.
496	 */
497	for (i = 0; i < NbStreams; i++)
498	{
499	    stream_id nst = StreamId(i);
500	    if (IsOpened(nst))
501	    {
502		_free_stream(nst);
503	    }
504	}
505    }
506
507    /*
508     * Create the standard Eclipse I/O streams
509     */
510    if (flags & ((ec_options.io_option == OWN_IO) ? INIT_PRIVATE : INIT_SHARED|REINIT_SHARED))
511    {
512	/*
513	 * Make the system streams (init or reinit)
514	 */
515	init_stream(null_,  NO_UNIT, SRDWR | SNULL | SSYSTEM, d_.null,
516	    NO_PROMPT, NO_STREAM, 0);
517
518	if (ec_options.io_option == MEMORY_IO)
519	{
520	    init_stream(current_input_, NO_UNIT, SREAD|SQUEUE|SSYSTEM|SYIELD,
521	    	D_UNKNOWN, NO_PROMPT, NO_STREAM, 0);
522	    init_stream(current_output_, NO_UNIT, SWRITE|SQUEUE|SSYSTEM|SFLUSHEOL|SYIELD,
523		D_UNKNOWN, NO_PROMPT, NO_STREAM, 0);
524	    init_stream(current_err_, NO_UNIT, SWRITE|SQUEUE|SSYSTEM|SFLUSHEOL|SYIELD,
525		D_UNKNOWN,  NO_PROMPT, NO_STREAM, 0);
526
527	}
528	else	/* connect to fd 0,1,2 */
529	{
530	    _init_fd_stream(current_input_,  0, SREAD|SSYSTEM,  d_.user,
531	    	in_dict(" ",0), current_output_, 0);
532	    _init_fd_stream(current_output_, 1, SWRITE|SSYSTEM, d_.user,
533	    	NO_PROMPT, NO_STREAM, 0);
534	    _init_fd_stream(current_err_,    2, SWRITE|SSYSTEM, d_.err,
535	    	NO_PROMPT, NO_STREAM, 0);
536	}
537    }
538    /* else we are attaching to shared memory: do nothing */
539
540    return PSUCCEED;
541}
542
543
544/*
545 * FUNCTION NAME:	init_stream
546 *
547 * PARAMETERS:
548 * 			nst -	Sepia stream number
549 *			unit -	OS file descriptor, NO_UNIT if a special one
550 *			mode -	Sepia i/o mode
551 *			name -	if a file, then its name, otherwise an acronym
552 *			prompt - the string to prompt with
553 *			prompt_stream - the stream where to print the prompt
554 *			size -	the size of the buffer or 0 if default
555 *				If buf != 0 then size must be its size
556 *
557 * DESCRIPTION:		Initialize a new stream. Fill the data into
558 *			the stream structure, allocate buffer(s)
559 *			if necessary.
560 */
561void
562init_stream(stream_id nst, int unit, int mode, dident name, dident prompt, stream_id prompt_stream, int size)
563{
564    unsigned char *buf;
565
566    if (!IsOpened(nst))
567    {
568        if (--NbStreamsFree <= 0)
569        {
570            NbStreamsFree = 0;  /* just in case it got out of sync */
571            Set_Tg_Soft_Lim(TG) /* initiate GC of stream handles */
572        }
573    }
574
575    StreamUnit(nst) = unit;
576    if (LocalStreams())
577    {
578	nst->fd_pid = 0;
579    }
580    else
581    {
582	/* stderr and stdout is handled in every process locally */
583	/* stdin and others are done on the fd owner */
584	nst->fd_pid = (unit == 1 || unit == 2) ? 0 : own_pid;
585    }
586    my_io_aport(&nst->aport);
587    a_mutex_init(&nst->lock);
588    StreamLine(nst) = 1;
589    StreamEncoding(nst) = SENC_DEFAULT;
590    StreamPromptStream(nst) = prompt_stream;
591    StreamPrompt(nst) = prompt;
592    StreamName(nst) = name;
593    StreamPath(nst) = D_UNKNOWN;
594    StreamCnt(nst) = 0;
595    StreamOffset(nst) = 0;
596    Make_Nil(&StreamEvent(nst));
597    StreamRand(nst) = 0;
598    StreamLastWritten(nst) = -1;
599    StreamOutputMode(nst) = DEFAULT_OUTPUT_MODE;
600    StreamPrintDepth(nst) = 0;
601    nst->signal_thread = 0;
602    switch (mode & STYPE)
603    {
604	case SSTRING:	nst->methods = (void_ptr) &ec_string_stream;	break;
605	case SPIPE:	nst->methods = (void_ptr) &ec_pipe;		break;
606	case SQUEUE:	nst->methods = (void_ptr) &ec_queue_stream;	break;
607	case SNULL:	nst->methods = (void_ptr) &ec_null_stream;	break;
608	case STTY:	nst->methods = (void_ptr) &ec_tty;		break;
609	case SSOCKET:	nst->methods = (void_ptr) &ec_socket;		break;
610	default:	nst->methods = (void_ptr) &ec_file;		break;
611    }
612    StreamMode(nst) = mode|StreamMethods(nst).mode_defaults;
613
614    /* Don't initialise StreamNref(nst) because it is a property of
615     * the descriptor, not of the stream proper. When the descriptor
616     * has just been obtained via find_free_stream(), it is 0 anyway.
617     * Otherwise the stream is only re-initialised, e.g. after restore. */
618
619    if (size == 0)
620    {
621	if (unit == NO_UNIT)
622	{
623	    size = StreamMethods(nst).buf_size_hint;
624	}
625	else
626	{
627#if defined(HAVE_ST_BLKSIZE)
628	    struct stat st;
629
630	    if(fstat(unit, &st) < 0 || st.st_blksize == 0)
631		/* if size not available, take default */
632		size = StreamMethods(nst).buf_size_hint;
633	    else
634		size = st.st_blksize;	/* else take the given size */
635#else
636	    size = StreamMethods(nst).buf_size_hint;
637#endif
638	}
639    }
640
641    StreamSize(nst) = size;
642    if (size == 0)
643    {
644	buf = 0;			/* no buffer, e.g. null stream */
645    }
646    else
647    {
648	/* allocate and initialise the I/O buffer (or buffer list) */
649	New_Buffer(nst, buf);
650	/* for queues, make a cyclic buffer list */
651	if (IsQueueStream(nst))
652	    BufHeader(buf)->next = BufHeader(buf)->prev = buf;
653    }
654    StreamBuf(nst) = StreamWBuf(nst) = buf;
655    if(IsReadStream(nst))
656    {
657	StreamLexAux(nst) = (unsigned char *) hg_alloc(BUFSIZE);
658	StreamLexSize(nst) = BUFSIZE;
659    }
660    else
661    {
662	StreamLexAux(nst) = NO_BUF;
663	StreamLexSize(nst) = 0;
664    }
665    StreamPtr(nst) = buf;
666}
667
668
669/* same as above, but the stream type is derived from the fd (unit) */
670
671static void
672_init_fd_stream(stream_id nst, int unit, int mode, dident name, dident prompt, stream_id prompt_stream, int size)
673{
674    if (isatty(unit)) {
675    	mode |= STTY;
676    } else {
677	if (errno == EBADF) {
678	    mode |= SNULL;
679	    unit = NO_UNIT;
680	} else if (_isafifo(unit)) {
681	    mode |= SPIPE;
682	} else {
683	    mode |= SFILE;
684	}
685	prompt = NO_PROMPT;
686	prompt_stream = NO_STREAM;
687	size = 0;
688    }
689    init_stream(nst, unit, mode, name, prompt, prompt_stream, size);
690}
691
692
693stream_id
694find_free_stream(void)
695{
696    int		i;
697    stream_id	nst;
698
699    for(i = 0; i < NbStreams; i++)
700    {
701	nst = StreamId(i);
702	if(!IsOpened(nst) && StreamNref(nst) == 0)
703	{
704	    return(nst);
705	}
706    }
707
708    if (LocalStreams())
709	return 0;		/* can't extend the static array */
710
711    StreamDescriptors = (stream_desc **) hg_realloc_size(
712		(generic_ptr) StreamDescriptors,
713		NbStreams * sizeof(stream_desc *),
714		(NbStreams + STREAM_INC) * sizeof(stream_desc *));
715    for (; i < NbStreams + STREAM_INC; i++)
716    {
717	StreamId(i) = (stream_desc *)  hg_alloc_size(sizeof(stream_desc));
718	StreamMode(StreamId(i)) = SCLOSED;
719	StreamNref(StreamId(i)) = 0;
720	StreamNr(StreamId(i)) = i;
721    }
722
723    nst = StreamId(NbStreams);
724    NbStreams += STREAM_INC;
725    NbStreamsFree += STREAM_INC;
726    return nst;
727}
728
729
730/*
731 * Support function for dictionary garbage collector
732 * Mark the DIDs in the stream descriptors
733 */
734
735void
736mark_dids_from_streams(void)
737{
738    int		i;
739    stream_id	nst;
740
741    for(i = 0; i < NbStreams; i++)
742    {
743	nst = StreamId(i);
744	if ((IsOpened(nst) || StreamNref(nst) > 0))
745	{
746	    if (StreamPrompt(nst) != D_UNKNOWN)	/* == SocketUnix */
747		Mark_Did(StreamPrompt(nst));
748	    if (StreamName(nst) != D_UNKNOWN)
749		Mark_Did(StreamName(nst));
750	    if (StreamPath(nst) != D_UNKNOWN)
751		Mark_Did(StreamPath(nst));
752	    mark_dids_from_pwords(&StreamEvent(nst), &StreamEvent(nst)+1);
753	}
754    }
755}
756
757
758stream_id
759ec_open_file(char *name, int mode, int *err)
760{
761    int		i = 0;
762    int		fd;
763    int		smode;
764    char	buf[MAX_PATH_LEN];
765    stream_id	nst;
766    io_channel_t *io_type;
767
768    /* translate the user mode to the corresponding system mode smode */
769    switch (mode)
770    {
771    case SREAD:			smode = O_RDONLY;			break;
772    case SWRITE:		smode = O_WRONLY | O_CREAT | O_TRUNC;	break;
773    case SRDWR:			smode = O_RDWR | O_CREAT;		break;
774    case SAPPEND:
775    case (SAPPEND|SWRITE):	smode = O_APPEND | O_CREAT | O_WRONLY;	break;
776    case (SAPPEND|SRDWR):	smode = O_APPEND | O_CREAT | O_RDWR;	break;
777    default:			*err = RANGE_ERROR; return NO_STREAM;
778    }
779
780    /* try to open the file (don't use absolute path if possible) */
781    (void) expand_filename(name, buf, EXPAND_STANDARD);
782    if ((fd = ec_open(buf, smode, 0666)) < 0) {
783	Set_Errno;
784	*err = SYS_ERROR;
785	return NO_STREAM;
786    }
787    io_type = isatty(fd) ? &ec_tty : &ec_file;
788
789    /* make the stream descriptor */
790    nst = find_free_stream();
791    init_stream(nst, fd, mode|io_type->io_type, enter_dict(name, 0), NO_PROMPT, NO_STREAM, 0);
792    (void) expand_filename(name, buf, EXPAND_ABSOLUTE);
793    StreamPath(nst) = enter_dict(buf, 0);
794    return(nst);
795}
796
797static void
798_free_stream(stream_id nst)
799{
800    if (!IsOpened(nst))
801        return;
802
803    if (StreamBuf(nst) != NO_BUF)
804    {
805	if (IsQueueStream(nst))
806	{
807	    /* free a circular list of buffers */
808	    unsigned char *first = StreamBuf(nst);
809	    unsigned char *next = StreamBuf(nst);
810	    do
811	    {
812		linked_io_buffer_t *this = BufHeader(next);
813		next = this->next;
814		hg_free((generic_ptr) this);
815	    } while (next != first);
816	}
817	else
818	{
819	    /* StreamBuf(nst) is either a single buffer, or a member
820	     * of a doubly linked list (in case of string stream) */
821	    unsigned char *prev = StreamBufHeader(nst)->prev;
822	    unsigned char *next = StreamBuf(nst);
823	    while (prev)
824	    {
825		linked_io_buffer_t *this = BufHeader(prev);
826		prev = this->prev;
827		hg_free((generic_ptr) this);
828	    }
829	    while (next)
830	    {
831		linked_io_buffer_t *this = BufHeader(next);
832		next = this->next;
833		hg_free((generic_ptr) this);
834	    }
835	}
836	StreamBuf(nst) = NO_BUF;
837    }
838    if (StreamLexAux(nst) != NO_BUF)
839    {
840	hg_free((generic_ptr)StreamLexAux(nst));
841	StreamLexAux(nst) = NO_BUF;
842    }
843    if (IsTag(StreamEvent(nst).tag.kernel, TPTR)) {
844	extern t_ext_type heap_event_tid;
845	heap_event_tid.free(StreamEvent(nst).val.wptr);
846	Make_Nil(&StreamEvent(nst));
847    }
848
849    StreamMode(nst) = SCLOSED;
850    ++NbStreamsFree;
851}
852
853/*
854 * Close a stream
855 * Options:
856 *	CLOSE_FORCE: free the stream even if there were errors
857 *	CLOSE_LOST: we are closing because the stream was lost (fail/gc)
858*/
859
860int
861ec_close_stream(stream_id nst, int options)
862{
863    int res = PSUCCEED;
864
865    if(!IsOpened(nst))
866	return STREAM_SPEC;
867
868    if (IsWriteStream(nst) && !IsQueueStream(nst))
869    {
870        int err = ec_flush(nst);
871        if ((err != PSUCCEED) && !(options & CLOSE_FORCE))
872            return err;
873	res = err;
874    }
875    /* Don't close the stdin, stdout and stderr file descriptors (SSYSTEM)
876     * because this very likely hangs the system. Moreover, we would then
877     * need a way to reopen them, e.g. after restoring a saved state.
878     */
879    if (StreamMode(nst) & SSYSTEM)
880        return res;
881
882    if (IsSocket(nst) && IsInvalidSocket(nst))
883        return res;	/* will be closed via paired SWRITE socket */
884
885    if (!(IsNullStream(nst) || IsStringStream(nst) || IsQueueStream(nst)))
886    {
887        int err = RemoteStream(nst) ? io_rpc(nst, IO_CLOSE) : _local_io_close(nst);
888        if ((err != PSUCCEED) && !(options & CLOSE_FORCE))
889            return err;
890	res = (res==PSUCCEED ? err : res);
891    }
892
893    if (IsSocket(nst))
894    {
895        if (SocketInputStream(nst))
896        {
897            --StreamNref(SocketInputStream(nst));
898            assert(StreamNref(SocketInputStream(nst)) == 0);
899            _free_stream(SocketInputStream(nst));
900        }
901        SocketConnection(nst) = 0;	/* otherwise we would try to free it */
902    }
903    else if(IsFileStream(nst) &&
904	( StreamMode(nst) & SDELETECLOSED ||
905	  StreamMode(nst) & SDELETELOST && options & CLOSE_LOST))
906    {
907	if (ec_unlink(DidName(StreamPath(nst))) < 0)
908	{
909	    Set_Errno
910	    if (!(options & CLOSE_FORCE)) return SYS_ERROR;
911	    res = (res==PSUCCEED ? SYS_ERROR : res);
912	}
913    }
914    _free_stream(nst);
915    return res;
916}
917
918
919/* Auxiliary function which has to be executed on the process
920 * that owns the file descriptor (possibly via rpc).
921 */
922
923#define THREAD_TIMEOUT 3000	/* milliseconds */
924
925static int
926_local_io_close(stream_id nst)
927{
928    int err;
929
930    Check_Stream_Owner(nst);
931
932    if (IsSocket(nst))
933    {
934	if (SocketUnix(nst) != D_UNKNOWN)
935	    (void) ec_unlink(DidName(SocketUnix(nst)));
936
937	(void) ec_stream_reset_sigio(nst, SWRITE);
938	if (SocketInputStream(nst))
939	    (void) ec_stream_reset_sigio(nst, SREAD);
940    }
941#if defined(HAVE_READLINE)
942    if (IsReadlineStream(nst)) {
943	if (fclose(StreamFILE(nst)) < 0)
944	{
945	    Set_Errno
946	    return(SYS_ERROR);
947	}
948    }
949#endif
950    if ((err = StreamMethods(nst).close(StreamUnit(nst))) != PSUCCEED)
951    {
952	return err;
953    }
954
955#if _WIN32
956    /* for sockets, clean up any signal-threads */
957    if (IsSocket(nst))
958    {
959	int dummy;
960	stream_id inst;
961	if (nst->signal_thread)
962	{
963	    ec_thread_wait(nst->signal_thread, &dummy, THREAD_TIMEOUT);
964	    if (ec_thread_terminate(nst->signal_thread, THREAD_TIMEOUT) < 0)
965		return SYS_ERROR;
966	    nst->signal_thread = 0;
967	}
968	if ((inst = SocketInputStream(nst)) && inst->signal_thread)
969	{
970	    ec_thread_wait(inst->signal_thread, &dummy, THREAD_TIMEOUT);
971	    if (ec_thread_terminate(inst->signal_thread, THREAD_TIMEOUT) < 0)
972		return SYS_ERROR;
973	    inst->signal_thread = 0;
974	}
975    }
976#endif
977    return(PSUCCEED);
978}
979
980
981reset_ttys_and_buffers(void)
982{
983    int i;
984
985    for (i = 0; i < NbStreams; i++)
986    {
987	stream_id nst = StreamId(i);
988	Lock_Stream(nst);
989	(void) ec_flush(nst);
990	Unlock_Stream(nst);
991    }
992}
993
994void
995flush_and_close_io(int own_streams_only)
996{
997    int         i;
998
999    for (i = 0; i < NbStreams; i++)
1000    {
1001	stream_id nst = StreamId(i);
1002	Lock_Stream(nst);
1003	if (!own_streams_only || !RemoteStream(nst))
1004	{
1005            (void) ec_close_stream(nst, CLOSE_FORCE|CLOSE_LOST);
1006	}
1007	Unlock_Stream(nst);
1008    }
1009}
1010
1011/**************** INPUT ****************/
1012
1013/*
1014 * FUNCTION NAME:	fill_buffer()
1015 *
1016 * PARAMETERS:		nst - stream identifier
1017 *
1018 * DESCRIPTION:
1019 *
1020 * Get a new buffer from the specified stream.
1021 * This function is called from lex_an() only when EOB_MARK is encountered,
1022 * and from ec_getch() if moreover Ptr - Buf >= Cnt
1023 * StreamPtr(nst) points to the EOB_MARK.
1024 */
1025int
1026fill_buffer(register stream_id nst)
1027{
1028    register unsigned char	*ptr;
1029    register unsigned char	*start;
1030    int i;
1031
1032    if (StreamPtr(nst) - StreamBuf(nst) < StreamCnt(nst))
1033	return PSUCCEED;	/* shouldn't happen */
1034
1035    /* Refilling after having returned EOF is only allowed for SoftEofStreams */
1036    if ((StreamMode(nst) & MEOF) && !IsSoftEofStream(nst))
1037	return PEOF;
1038
1039    if (IsStringStream(nst))
1040    {
1041	return _string_fill_buffer(nst);
1042    }
1043    else if (IsQueueStream(nst))
1044    {
1045	return _queue_fill_buffer(nst);
1046    }
1047
1048    /* Copy the last LOOKAHEAD characters in the buffer before its beginning
1049     * to enable the pushback in the lexer.
1050     */
1051    ptr = StreamPtr(nst);
1052    start = StreamBuf(nst);
1053    for (i = LOOKAHEAD; i > 0; --i)
1054    	*--start = *--ptr;
1055
1056    /* print the prompt if necessary */
1057    if (StreamMode(nst) & DONT_PROMPT)
1058	StreamMode(nst) ^= DONT_PROMPT;	/* but print further prompts */
1059    else if(StreamPromptStream(nst) != NO_STREAM)
1060    {
1061	(void) ec_outf(	StreamPromptStream(nst),
1062		DidName(StreamPrompt(nst)),
1063		DidLength(StreamPrompt(nst)));
1064	(void) StreamMethods(nst).flush(StreamPromptStream(nst));
1065    }
1066
1067    /* If the buffer has been modified, flush it before reading a new one. */
1068    (void) StreamMethods(nst).flush(nst);
1069
1070    return RemoteStream(nst) ? io_rpc(nst, IO_FILL): _local_fill_buffer(nst);
1071}
1072
1073
1074/* Auxiliary function which has to be executed on the process
1075 * that owns the file descriptor (possibly via rpc).
1076 */
1077static int
1078_local_fill_buffer(stream_id nst)
1079{
1080    int		count;
1081
1082    Check_Stream_Owner(nst);
1083    /* initialise the buffer properly, cause the read may be aborted */
1084    StreamOffset(nst) += StreamCnt(nst);
1085    StreamCnt(nst) = 0;
1086    StreamPtr(nst) = StreamBuf(nst);
1087    *StreamPtr(nst) = EOB_MARK;
1088
1089    if (E_read_hook != NULL)
1090	while (!(*E_read_hook)(StreamUnit(nst)));
1091
1092#if defined(HAVE_READLINE)
1093    if (IsReadlineStream(nst)) {
1094	char		*line;
1095	char		*expansion;
1096	int		res;
1097	extern FILE	*rl_instream;
1098	extern char	*readline();
1099	extern void	add_history();
1100	extern int	history_expand();
1101
1102	rl_instream = StreamFILE(nst);
1103	line = readline("");
1104	if (line == (char *) 0)
1105	    count = 0;
1106	else {
1107	    res = history_expand(line, &expansion);
1108	    if (res) {
1109		(void) ec_outfs(StreamPromptStream(nst), expansion);
1110		(void) ec_outfc(StreamPromptStream(nst), '\n');
1111		(void) ec_flush(StreamPromptStream(nst));
1112	    }
1113	    switch (res)
1114	    {
1115	    case 2:	/* :p - print only */
1116		/* add_history() is called in history_expand() */
1117		/* fall into */
1118	    case -1:	/* error in expansion */
1119		count = 1;
1120		(void) free(expansion);
1121		*line = '\0';
1122		break;
1123
1124	    case 1:	/* expansion done */
1125		(void) free(line);
1126		line = expansion;
1127		/* fall into */
1128	    case 0:	/* no expansion */
1129		count = strlen(line) + 1;
1130	    }
1131	    if (count > StreamSize(nst)) {
1132		_resize_stream_buffer(nst, (long) count);
1133	    }
1134	    (void) strcpy((char *) StreamBuf(nst), line);
1135	    StreamBuf(nst)[count - 1] = '\n';
1136	    StreamBuf(nst)[count] = EOB_MARK;
1137	    if (count > 1)
1138		add_history(line);
1139	    (void) free(line);
1140	}
1141	StreamBuf(nst)[count] = EOB_MARK;
1142    } else
1143#endif
1144    {
1145	int wanted = StreamSize(nst);
1146	count = StreamMethods(nst).read(StreamUnit(nst), (char *) StreamBuf(nst), wanted);
1147	/* Caution: Windows may store a ^Z in the buffer while returning 0... */
1148	StreamBuf(nst)[count] = EOB_MARK;
1149	if (count <= 0)
1150	{
1151	    StreamMode(nst) &= ~MREAD;
1152	    return count == 0 ? PEOF : SYS_ERROR;
1153	}
1154	else if (StreamMode(nst) & SSIGIO)
1155	{
1156	    ec_reenable_sigio(nst, wanted, count);
1157	}
1158    }
1159    StreamMode(nst) = (StreamMode(nst) & ~MEOF) | MREAD;
1160    StreamCnt(nst) = count;
1161
1162    if (StreamMode(nst) & SSCRAMBLE)
1163    {
1164	/*
1165	 * descramble the data we have just read
1166	 */
1167	int i;
1168	uint32 key = StreamRand(nst);
1169	for (i=0; i < count; ++i)
1170	{
1171	    uint8 plain;
1172	    key = NextRand(key);
1173	    StreamBuf(nst)[i] = plain = StreamBuf(nst)[i] ^ (key % 0xff);
1174	    key += plain;
1175	}
1176	StreamRand(nst) = key;
1177    }
1178
1179    return(PSUCCEED);
1180}
1181
1182
1183static int
1184_queue_fill_buffer(stream_id nst)
1185{
1186    if (StreamWBuf(nst) != StreamBuf(nst))
1187    {
1188	Advance_Buffer(nst)
1189	Free_Prev_Buffer(nst)
1190	if (StreamBuf(nst) != StreamWBuf(nst))
1191	    StreamOffset(nst) -= StreamCnt(nst);
1192	return PSUCCEED;
1193    }
1194    else
1195    {
1196	StreamMode(nst) &= ~MWRITE;	/* nothing more to read */
1197	return PEOF;
1198    }
1199}
1200
1201
1202static int
1203_string_fill_buffer(stream_id nst)
1204{
1205    if (StreamBufHeader(nst)->next)
1206    {
1207	StreamOffset(nst) += StreamCnt(nst);
1208	Advance_Buffer(nst)
1209	return PSUCCEED;
1210    }
1211    else
1212    {
1213	return PEOF;
1214    }
1215}
1216
1217
1218/*ARGSUSED*/
1219int
1220set_readline(stream_id nst)
1221{
1222#if defined(HAVE_READLINE)
1223    FILE	*f;
1224    extern char *rl_basic_word_break_characters;
1225    extern char *rl_completer_quote_characters;
1226    extern char *rl_readline_name;
1227    extern void	using_history(), stifle_history();
1228
1229    f = fdopen(StreamUnit(nst), "r");
1230    if (f == (FILE *) 0) {
1231	return SYS_ERROR;
1232    }
1233    nst->stdfile = (generic_ptr) f;
1234    StreamMode(nst) |= READLINE;
1235    using_history();
1236    stifle_history(100);
1237    /* we include all special and solo characters as word breakers,
1238       and '.' so that filenames are quoted */
1239    rl_basic_word_break_characters = " \t\n\"\\';.|&{}()[],!";
1240    rl_completer_quote_characters = "'\"";
1241    rl_readline_name = "eclipse";
1242#endif
1243    return PSUCCEED;
1244}
1245
1246/*
1247 * FUNCTION NAME:	ec_getch(nst)
1248 *
1249 * PARAMETERS:		nst -	input stream number
1250 *
1251 * DESCRIPTION:		Reads one character from the specified input
1252 * 			stream. If the buffer is empty, it calls fill_buffer
1253 *			to get a new one.
1254 *			MUST BE CALLED WITH STREAM LOCKED !!!
1255 */
1256int
1257ec_getch(stream_id nst)
1258{
1259    int			res;
1260
1261    if (nst == null_)
1262	return PEOF;
1263    if(!(IsReadStream(nst)))
1264	return(STREAM_MODE);
1265    if (StreamPtr(nst) - StreamBuf(nst) >= StreamCnt(nst))
1266    {
1267	if (fill_buffer(nst) != PSUCCEED)
1268	{
1269	    if (StreamMode(nst) & MEOF)
1270		return IsSoftEofStream(nst) ? PEOF : READ_PAST_EOF;
1271	    else
1272		StreamMode(nst) |= MEOF;
1273	    return PEOF;
1274	}
1275    }
1276    res = *StreamPtr(nst)++;
1277    if (res == '\n')
1278	StreamLine(nst)++;
1279    return(res);
1280}
1281
1282
1283int
1284ec_ungetch(stream_id nst)
1285{
1286    int		res;
1287
1288    if (nst == null_)
1289	return PSUCCEED;
1290    if(!(IsReadStream(nst)))
1291	return(STREAM_MODE);
1292
1293    if (StreamMode(nst) & MEOF)
1294    {
1295	/* if we had just read eof, simply forget this fact */
1296	StreamMode(nst) &= ~MEOF;
1297    }
1298    else
1299    {
1300	/*
1301	 * Ideally, we would allow exactly LOOKAHEAD bytes of unget
1302	 * (independent of current buffer alignment), except at the beginning
1303	 * of the stream, or after a seek.
1304	 * To implement this precisely is very difficult, because:
1305	 * - after seek, the buffer contents may or may not be valid (MREAD),
1306	 *   the lookahead bytes may be valid even if the buffer is invalid
1307	 * - we don't know the "beginning" of queues
1308	 * - we don't know how many consecutive ungets we had already
1309	 * We therefore just say that you can unget up to LOOKAHEAD bytes
1310	 * of what you have read immediately before, otherwise the result
1311	 * is undefined (you will read random data after an illegal unget).
1312	 */
1313	if (StreamPtr(nst) > StreamBuf(nst) - LOOKAHEAD)
1314	{
1315	    /* actually go back in the buffer (or the lookahead pre-buffer) */
1316	    --StreamPtr(nst);
1317	    if (*StreamPtr(nst) == '\n')
1318		StreamLine(nst)--;
1319	}
1320    }
1321    Succeed_;
1322}
1323
1324
1325/*
1326 * read the next n characters from the specified stream and return
1327 * a pointer to this string. This is an unterminated string!
1328 * *res is set to the number of available bytes (may be less than n).
1329 * If the string is completely in the stream input buffer, we return
1330 * a pointer into this buffer, otherwise a compact copy is constructed
1331 * in the lex_aux buffer and its address is returned.
1332 * If there was a problem reading, NULL is returned and *res is set
1333 * to the error code.
1334 */
1335
1336char *
1337ec_getstring(stream_id nst,
1338	word n,		/* number of bytes requested */
1339	word *res)	/* number of bytes read, or error code */
1340{
1341    register unsigned char	*pbuf, *paux;
1342    word			avail, nleft;
1343    word			lex_aux_size;
1344    int				err;
1345
1346    if (StreamPtr(nst) - StreamBuf(nst) >= StreamCnt(nst))
1347    {
1348	if (fill_buffer(nst) != PSUCCEED)
1349	{
1350	    if (StreamMode(nst) & MEOF) {
1351		*res = IsSoftEofStream(nst) ? PEOF : READ_PAST_EOF;
1352	    } else {
1353		StreamMode(nst) |= MEOF;
1354		*res = PEOF;
1355	    }
1356	    return NULL;
1357	}
1358    }
1359    pbuf = StreamPtr(nst);
1360    avail = StreamBuf(nst) + StreamCnt(nst) - pbuf;
1361
1362    if (avail >= n)	/* all requested bytes are in the buffer */
1363    {
1364	StreamPtr(nst) = pbuf + n;
1365	*res = n;
1366	return (char *) pbuf;
1367    }
1368
1369    /* we have to make a copy */
1370
1371    lex_aux_size = StreamLexSize(nst);
1372    if (n > lex_aux_size)	/* grow the lex_aux buffer, if necessary */
1373    {
1374	while (lex_aux_size < n)
1375		lex_aux_size *= 2;
1376	hg_free((generic_ptr) StreamLexAux(nst));
1377	StreamLexAux(nst) = (unsigned char *) hg_alloc((int)lex_aux_size);
1378	StreamLexSize(nst) = lex_aux_size;
1379    }
1380
1381    /* now copy the string into the lex_aux buffer */
1382
1383    paux = StreamLexAux(nst);
1384    nleft = n;
1385    while (avail < nleft)
1386    {
1387	nleft -= avail;
1388	while (avail--)
1389	    *paux++ = *pbuf++;
1390	StreamPtr(nst) = pbuf;	/* needed for fill_buffer() */
1391	err = fill_buffer(nst);
1392	switch(err)
1393	{
1394	case PSUCCEED:
1395	    break;
1396	case PEOF:
1397	    *res = n - nleft;
1398	    return (char *) StreamLexAux(nst);
1399	default:
1400	    *res = (word) err;
1401	    return NULL;
1402	}
1403	pbuf = StreamPtr(nst);
1404	avail = StreamBuf(nst) + StreamCnt(nst) - pbuf;
1405    }
1406    while (nleft--)
1407	*paux++ = *pbuf++;
1408    StreamPtr(nst) = pbuf;
1409    *res = n;
1410    return (char *) StreamLexAux(nst);
1411}
1412
1413
1414static int
1415_queue_read(stream_id nst, char *s, int count)
1416{
1417    int rem_count = count;
1418    int inbuf = StreamBuf(nst) + StreamCnt(nst) - StreamPtr(nst);
1419    unsigned char *ptr;
1420
1421    while (rem_count > inbuf)
1422    {
1423	/* copy the whole rest of this buffer */
1424	for(ptr = StreamPtr(nst); inbuf > 0; --inbuf,--rem_count)
1425	{
1426	    *s++ = *ptr++;
1427	}
1428	StreamPtr(nst) = ptr;
1429
1430	/* get next buffer if available */
1431	if (_queue_fill_buffer(nst) == PEOF)
1432	    return count-rem_count;	/* less than requested */
1433
1434	inbuf = StreamBuf(nst) + StreamCnt(nst) - StreamPtr(nst);
1435    }
1436
1437    /* copy the remaining wanted chars from the last buffer */
1438    for(ptr = StreamPtr(nst); rem_count > 0; --inbuf,--rem_count)
1439    {
1440	*s++ = *ptr++;
1441    }
1442    StreamPtr(nst) = ptr;
1443    if (inbuf == 0)
1444    {
1445	StreamMode(nst) &= ~MWRITE;	/* nothing more to read */
1446    }
1447    return count;
1448}
1449
1450
1451static int
1452_queue_content(stream_id nst, char *s)
1453{
1454    int count = 0;
1455    unsigned char *buf = StreamBuf(nst);
1456    unsigned char *ptr = StreamPtr(nst);
1457
1458    for(;;)
1459    {
1460	count += BufHeader(buf)->cnt - (ptr - buf);
1461
1462	while(ptr < buf + BufHeader(buf)->cnt)
1463	    *s++ = *ptr++;
1464
1465	if (buf == StreamWBuf(nst))
1466	    break;
1467
1468	ptr = buf = BufHeader(buf)->next;
1469    }
1470    return count;
1471}
1472
1473
1474static int
1475_queue_size(stream_id nst)
1476{
1477    if (StreamBuf(nst) == StreamWBuf(nst))
1478	return StreamOffset(nst) +
1479	    BufHeader(StreamWBuf(nst))->cnt - (StreamPtr(nst)-StreamBuf(nst));
1480    else
1481	return StreamOffset(nst) +
1482	    (StreamSize(nst)-(StreamPtr(nst)-StreamBuf(nst))) + BufHeader(StreamWBuf(nst))->cnt;
1483}
1484
1485static int
1486_string_size(stream_id nst)
1487{
1488    unsigned char *buf = StreamBuf(nst);
1489    int count = 0;
1490
1491    while (BufHeader(buf)->prev)
1492    	buf = BufHeader(buf)->prev;
1493
1494    do
1495    {
1496	count += BufHeader(buf)->cnt;
1497	buf = BufHeader(buf)->next;
1498    } while(buf);
1499
1500    return count;
1501}
1502
1503
1504static int
1505_string_content(stream_id nst, char *s)
1506{
1507    unsigned char *buf = StreamBuf(nst);
1508    unsigned char *ptr;
1509    int count = 0;
1510
1511    while (BufHeader(buf)->prev)
1512    	buf = BufHeader(buf)->prev;
1513
1514    do
1515    {
1516	count += BufHeader(buf)->cnt;
1517	for (ptr = buf; ptr < buf + BufHeader(buf)->cnt; )
1518	    *s++ = *ptr++;
1519	buf = BufHeader(buf)->next;
1520    } while(buf);
1521
1522    return count;
1523}
1524
1525
1526int Winapi
1527ec_queue_read(int qid, char *s, int count)
1528{
1529    stream_id nst = StreamId(qid);
1530    if (!IsOpened(nst))
1531	return STREAM_SPEC;
1532    if (!IsQueueStream(nst) || !IsWriteStream(nst))
1533	return STREAM_MODE;
1534    return _queue_read(nst, s, count);
1535}
1536
1537int Winapi
1538ec_queue_avail(int qid)
1539{
1540    stream_id nst = StreamId(qid);
1541    if (!IsOpened(nst))
1542	return STREAM_SPEC;
1543    if (!IsQueueStream(nst) || !IsWriteStream(nst))
1544	return STREAM_MODE;
1545    return _queue_size(nst);
1546}
1547
1548
1549/**************** OUTPUT ****************/
1550
1551static int
1552_queue_write(stream_id nst, char *s, int count)
1553{
1554    unsigned char *wptr, *wbuf, *new_buf;
1555    word i, bfree;
1556
1557    if (count == 0)
1558	return PSUCCEED;
1559
1560    /* raise event if writing to empty queue */
1561    if (StreamBuf(nst) == StreamWBuf(nst) && StreamPtr(nst) == StreamBuf(nst) + StreamCnt(nst))
1562    {
1563	StreamMode(nst) &= ~MEOF;
1564	if (!IsNil(StreamEvent(nst).tag))
1565	{
1566	    int res = ec_post_event(StreamEvent(nst));
1567	    Return_If_Error(res);
1568	}
1569    }
1570
1571    /* fill buffers to their end, and insert new ones if needed */
1572    wbuf = StreamWBuf(nst);
1573    wptr = wbuf + BufHeader(wbuf)->cnt;
1574    bfree = StreamSize(nst) - BufHeader(wbuf)->cnt;
1575    while (count > bfree)
1576    {
1577	while (bfree-- > 0)
1578	{
1579	    *wptr++ = *s++;
1580	    --count;
1581	}
1582	*wptr = EOB_MARK;
1583	BufHeader(wbuf)->cnt = wptr - wbuf;
1584	if (StreamBuf(nst) == wbuf)
1585	    StreamCnt(nst) = BufHeader(wbuf)->cnt;
1586	else
1587	    StreamOffset(nst) += BufHeader(wbuf)->cnt;
1588
1589	if (BufHeader(wbuf)->next != StreamBuf(nst))
1590	{
1591	    p_fprintf(current_err_, "Inconsistent buffer list in queue\n");
1592	    ec_flush(current_err_);
1593	}
1594	/* insert a new empty write buffer */
1595	New_Buffer(nst, new_buf);
1596	BufHeader(new_buf)->prev = wbuf;
1597	BufHeader(new_buf)->next = BufHeader(wbuf)->next;
1598	BufHeader(BufHeader(wbuf)->next)->prev = new_buf;
1599	BufHeader(wbuf)->next = new_buf;
1600	wptr = wbuf = new_buf;
1601	bfree = StreamSize(nst);
1602    }
1603
1604    /* Fill last buffer partially (count <= bfree) */
1605    for(i = BufHeader(wbuf)->cnt; i < BufHeader(wbuf)->cnt + count; ++i)
1606    	wbuf[i] = *s++;
1607    BufHeader(wbuf)->cnt += count;
1608    if (StreamBuf(nst) == wbuf)
1609	StreamCnt(nst) = BufHeader(wbuf)->cnt;
1610
1611    wbuf[BufHeader(wbuf)->cnt] = EOB_MARK;
1612    StreamWBuf(nst) = wbuf;
1613    StreamMode(nst) |= MWRITE;	/* there is something to read */
1614    return PSUCCEED;
1615}
1616
1617static int
1618_string_write(stream_id nst, char *s, int count)
1619{
1620    /* remaining space in the current buffer */
1621    int bfree = StreamSize(nst) - (StreamPtr(nst) - StreamBuf(nst));
1622
1623    /* fill buffers to their end, and append new ones */
1624    while (count > bfree)
1625    {
1626	while (bfree-- > 0)
1627	{
1628	    *(StreamPtr(nst))++ = *s++;
1629	    --count;
1630	}
1631	if (StreamPtr(nst) > StreamBuf(nst) + StreamCnt(nst))
1632	{
1633	    StreamCnt(nst) = StreamBufHeader(nst)->cnt = StreamPtr(nst) - StreamBuf(nst);
1634	    *StreamPtr(nst) = EOB_MARK;
1635	}
1636	if (!StreamBufHeader(nst)->next)
1637	{
1638	    Append_New_Buffer(nst);
1639	}
1640	StreamOffset(nst) += StreamCnt(nst);
1641	Advance_Buffer(nst);
1642	bfree = StreamSize(nst) - (StreamPtr(nst) - StreamBuf(nst));
1643    }
1644
1645    /* Fill last buffer partially (count <= bfree) */
1646    while (count-- > 0)
1647    {
1648    	*(StreamPtr(nst))++ = *s++;
1649    }
1650    if (StreamPtr(nst) > StreamBuf(nst) + StreamCnt(nst))
1651    {
1652	StreamCnt(nst) = StreamBufHeader(nst)->cnt = StreamPtr(nst) - StreamBuf(nst);
1653	*StreamPtr(nst) = EOB_MARK;
1654    }
1655    Succeed_;
1656}
1657
1658static int
1659_buffer_write(stream_id nst, char *s, int count)
1660{
1661    int bfree = StreamSize(nst) - (StreamPtr(nst) - StreamBuf(nst));
1662    int res = PSUCCEED;
1663
1664    if (bfree < 0 || bfree > StreamSize(nst))
1665    {
1666	/*
1667	 * Ptr points outside the buffer, we *must* be reading
1668	 * the lex_aux buffer, i.e. the buffer is MREAD.
1669	 */
1670	StreamPtr(nst) = StreamBuf(nst);
1671	bfree = StreamSize(nst);
1672    }
1673    while (count > 0)
1674    {
1675	int size = count < bfree ? count : bfree;
1676	count -= size;
1677	bfree -= size;
1678	while (size--)
1679	    *(StreamPtr(nst))++ = *s++;
1680	StreamMode(nst) |= MWRITE;
1681	if (count <= 0)
1682	    break;
1683	else if ((res = StreamMethods(nst).flush(nst)) != PSUCCEED)
1684	    break;
1685	/* for string streams StreamPtr(nst) != StreamBuf(nst) ! */
1686	bfree = StreamSize(nst) - (StreamPtr(nst) - StreamBuf(nst));
1687    }
1688    /*
1689     * If we are writing into a r/w stream whose buffer was not
1690     * read, we must mark that there is nothing to read for lex_an()
1691     */
1692    if (!(StreamMode(nst) & MREAD))
1693	*StreamPtr(nst) = EOB_MARK;
1694    return res;
1695}
1696
1697static int
1698_tty_write(stream_id nst, char *s, int count)
1699{
1700    if (IsTty(nst) && StreamMode(nst) & MREAD)
1701    {
1702	/*
1703	 * The tty is open in update mode and there is something
1704	 * in the buffer. We flush it, maybe discarding the rest.
1705	 */
1706	StreamPtr(nst) = StreamBuf(nst);
1707	StreamCnt(nst) = 0;
1708	StreamMode(nst) &= ~MREAD;
1709    }
1710    return _buffer_write(nst, s, count);
1711}
1712
1713/*ARGSUSED*/
1714static int
1715_null_write(stream_id nst, char *s, int count)
1716{
1717    Succeed_;
1718}
1719
1720int Winapi
1721ec_queue_write(int qid, char *s, int count)
1722{
1723    int res;
1724    stream_id nst = StreamId(qid);
1725    if (!IsOpened(nst))
1726	return STREAM_SPEC;
1727    if (!IsQueueStream(nst) || !IsReadStream(nst))
1728	return STREAM_MODE;
1729    res = _queue_write(nst, s, count);
1730    return res < 0 ? res : count;
1731}
1732
1733
1734/*
1735 * FUNCTION NAME:	ec_outf(nst, s, count)
1736 *
1737 * PARAMETERS:		nst -	output stream number
1738 *			s -	address of the area to output
1739 *			count -	number of bytes to output
1740 *
1741 * DESCRIPTION:		This is the basic output primitive that
1742 *			writes the specified number of bytes into the
1743 *			output buffer. The check for a full buffer
1744 *			is done at the beginning, i.e. after the output
1745 *			the buffer may be full.
1746 */
1747int
1748ec_outf(stream_id nst, const char *s, int count)
1749{
1750    if (!IsWriteStream(nst))
1751	return STREAM_MODE;
1752    if (count <= 0)
1753	return PSUCCEED;
1754    StreamLastWritten(nst) = s[count-1];
1755    return StreamMethods(nst).outf(nst, (char *) s, count);
1756}
1757
1758
1759/*
1760 * FUNCTION NAME:	ec_outfc(nst, c)
1761 *
1762 * PARAMETERS:		nst -	output stream number
1763 *			c - character to output
1764 *
1765 * DESCRIPTION:		Write one character into the output buffer.
1766 */
1767int
1768ec_outfc(stream_id nst, int c)
1769{
1770    char ch = c;
1771    if (!IsWriteStream(nst))
1772	return STREAM_MODE;
1773    StreamLastWritten(nst) = c;
1774    return StreamMethods(nst).outf(nst, &ch, 1);
1775}
1776
1777
1778/*
1779 * FUNCTION NAME:	ec_outfw(nst, w)
1780 *
1781 * PARAMETERS:		nst -	output stream number
1782 *			w - word to output
1783 *
1784 * DESCRIPTION:		Write one longword into the output buffer.
1785 *			StreamPtr is assumed to be longword-aligned!
1786 *			This is only used internally for profile_stream.
1787 *			Stream must be a file.
1788 */
1789int
1790ec_outfw(stream_id nst, word w)
1791{
1792    int			res = PSUCCEED;
1793    register char	*p;
1794    register char	*s;
1795    int			i;
1796
1797    if (StreamPtr(nst) == StreamBuf(nst) + StreamSize(nst))
1798	res = StreamMethods(nst).flush(nst);
1799    p = (char *) StreamPtr(nst);
1800    s = (char *) &w;
1801    for (i = 0; i < sizeof(word); i++)
1802	*p++ = *s++;
1803    StreamPtr(nst) = (unsigned char *) p;
1804    StreamMode(nst) |= MWRITE;
1805    return res;
1806}
1807
1808int
1809ec_newline(stream_id nst)
1810{
1811    int		res;
1812
1813    if (StreamMode(nst) & SEOLCR)
1814	res = ec_outf(nst, "\r\n", 2);
1815    else
1816	res = ec_outfc(nst, '\n');
1817    if (res != PSUCCEED)
1818	return res;
1819    if (StreamMode(nst) & SFLUSHEOL)
1820	return StreamMethods(nst).flush(nst);
1821    return PSUCCEED;
1822}
1823
1824int
1825ec_outfs(stream_id nst, const char *s)
1826{
1827    return ec_outf(nst, s, strlen(s)); /* could be more efficient */
1828}
1829
1830int
1831ec_flush(stream_id nst)
1832{
1833    if (!IsWriteStream(nst))
1834	return(STREAM_MODE);
1835    return StreamMethods(nst).flush(nst);
1836}
1837
1838
1839#if defined(HAVE_READLINE)
1840/*
1841 * Increase the size of a (string) stream buffer
1842 * Make sure that newsize >= StreamSize(nst) !
1843 * This function may modify StreamBuf, StreamPtr, StreamSize
1844 */
1845static void
1846_resize_stream_buffer(stream_id nst, word newsize)
1847{
1848    register word ptr_off = StreamPtr(nst) - StreamBuf(nst);
1849
1850    StreamBuf(nst) = (unsigned char *) (
1851			(linked_io_buffer_t*) hg_resize(
1852				(generic_ptr)(StreamBufHeader(nst)),
1853				(int)(sizeof(linked_io_buffer_t) + newsize + 1))
1854			+ 1);
1855    StreamBuf(nst)[newsize] = EOB_MARK;
1856    StreamSize(nst) = newsize;
1857    StreamPtr(nst) = StreamBuf(nst) + ptr_off;
1858}
1859#endif
1860
1861
1862/*
1863 * FUNCTION NAME:	..._flush(nst)
1864 *
1865 * PARAMETERS:		nst -	stream number
1866 *
1867 * DESCRIPTION:		Flushes the buffer which is supposed to
1868 *			contain some written data. If the buffer was originally
1869 *			read in, it has to seek back and write the whole
1870 *			buffer.
1871 */
1872
1873static int
1874_dummy_flush(stream_id nst)
1875{
1876    Succeed_;
1877}
1878
1879static int
1880_queue_flush(stream_id nst)
1881{
1882    if ((StreamMode(nst) & SYIELD) && (StreamMode(nst) & MWRITE))
1883    	return YIELD_ON_FLUSH_REQ;
1884    Succeed_;
1885}
1886
1887static int
1888_buffer_flush(stream_id nst)
1889{
1890    if (!(StreamMode(nst) & MWRITE))
1891    {
1892	Succeed_;
1893    }
1894    return RemoteStream(nst) ? io_rpc(nst, IO_FLUSH) : _local_io_flush_out(nst);
1895}
1896
1897/* Auxiliary function which has to be executed on the process
1898 * that owns the file descriptor (possibly via rpc).
1899 */
1900static int
1901_local_io_flush_out(stream_id nst)
1902{
1903    int		n;
1904
1905    Check_Stream_Owner(nst);
1906    /* save the last character */
1907    *(StreamBuf(nst) - 1) = *(StreamPtr(nst) - 1);
1908    if (StreamMode(nst) & MREAD && !IsTty(nst))
1909    {
1910	if (lseek(StreamUnit(nst), (long) - StreamCnt(nst), LSEEK_INCR) == -1)
1911	{
1912	    Set_Errno;
1913	    return(OUT_ERROR);
1914	}
1915	if (StreamPtr(nst) < StreamBuf(nst) + StreamCnt(nst))
1916	    StreamPtr(nst) = StreamBuf(nst) + StreamCnt(nst);
1917	StreamMode(nst) &= ~MREAD;
1918    }
1919    if (StreamMode(nst) & SSCRAMBLE)
1920    {
1921	/*
1922	 * scramble the data before writing it out
1923	 */
1924	char *scrambled_buf = hp_alloc(StreamSize(nst));
1925	int count = StreamPtr(nst) - StreamBuf(nst);
1926	uint32 key = StreamRand(nst);
1927	int i;
1928	for (i=0; i < count; ++i)
1929	{
1930	    uint8 plain = StreamBuf(nst)[i];
1931	    key = NextRand(key);
1932	    scrambled_buf[i] = plain ^ (key % 0xff);
1933	    key += plain;
1934	}
1935	StreamRand(nst) = key;
1936	n = StreamMethods(nst).write(StreamUnit(nst), scrambled_buf, count);
1937	hp_free(scrambled_buf);
1938    }
1939    else
1940    {
1941	n = StreamMethods(nst).write(StreamUnit(nst),
1942			(char *) StreamBuf(nst),
1943			StreamPtr(nst) - StreamBuf(nst));
1944    }
1945    if (n == PSUCCEED)
1946    {
1947	StreamOffset(nst) += StreamPtr(nst) - StreamBuf(nst);
1948	StreamPtr(nst) = StreamBuf(nst);
1949	StreamMode(nst) &= ~MWRITE;
1950	if (IsReadStream(nst))
1951	{
1952	    /* Mark the input buffer empty. */
1953	    StreamCnt(nst) = 0;
1954	    *StreamBuf(nst) = EOB_MARK;
1955	}
1956    }
1957    return(n);
1958}
1959
1960
1961/*
1962 * FUNCTION NAME:	ec_seek_stream(nst, pos, whence)
1963 *
1964 * PARAMETERS:		nst -	stream number
1965 *
1966 * DESCRIPTION:		Seek on a stream.
1967 */
1968
1969int
1970ec_seek_stream(stream_id nst, long int pos, int whence)
1971{
1972    return StreamMethods(nst).seek(nst, pos, whence);
1973}
1974
1975/*ARGSUSED*/
1976static int
1977_dummy_seek(stream_id nst, long int pos, int whence)
1978{
1979    return PSUCCEED;
1980}
1981
1982/*ARGSUSED*/
1983static int
1984_illegal_seek(stream_id nst, long int pos, int whence)
1985{
1986    return STREAM_MODE;
1987}
1988
1989static int
1990_string_seek(stream_id nst, long int pos, int whence)
1991{
1992    int i;
1993    if (pos < 0)
1994	return RANGE_ERROR;
1995
1996    if (whence == LSEEK_END)			/* seek to current end */
1997    {
1998	/* skip to the last buffer */
1999	while(StreamBufHeader(nst)->next)
2000	{
2001	    StreamOffset(nst) += StreamCnt(nst);
2002	    Advance_Buffer(nst)
2003	}
2004	StreamPtr(nst) = StreamBuf(nst) + StreamCnt(nst);
2005    }
2006    else if (pos <= StreamOffset(nst) + StreamCnt(nst)) /* seek backwards */
2007    {
2008	while(pos < StreamOffset(nst))
2009	{
2010	    Retreat_Buffer(nst)
2011	    StreamOffset(nst) -= StreamCnt(nst);
2012	}
2013	StreamPtr(nst) = StreamBuf(nst) + (pos - StreamOffset(nst));
2014    }
2015    else					/* seek forward */
2016    {
2017	/* skip forward over existing buffers */
2018	while(pos >= StreamOffset(nst) + StreamSize(nst) && StreamBufHeader(nst)->next)
2019	{
2020	    StreamOffset(nst) += StreamCnt(nst);
2021	    Advance_Buffer(nst)
2022	}
2023
2024	if (pos < StreamOffset(nst) + StreamCnt(nst))
2025	{
2026	    /* seek target is before the current end */
2027	    StreamPtr(nst) = StreamBuf(nst) + (pos - StreamOffset(nst));
2028	}
2029	else if (pos < StreamOffset(nst) + StreamSize(nst))
2030	{
2031	    /* stream gets extended within existing buffer */
2032	    if (!IsWriteStream(nst))
2033		return RANGE_ERROR;
2034	    for (i = StreamCnt(nst); i < pos - StreamOffset(nst); ++i)
2035		StreamBuf(nst)[i] = 0;
2036	    StreamCnt(nst) = BufHeader(StreamBuf(nst))->cnt = pos - StreamOffset(nst);
2037	    StreamBuf(nst)[StreamCnt(nst)] = EOB_MARK;
2038	    StreamPtr(nst) = StreamBuf(nst) + StreamCnt(nst);
2039	}
2040	else
2041	{
2042	    /* the stream must be extended with new buffers */
2043	    if (!IsWriteStream(nst))
2044		return RANGE_ERROR;
2045
2046	    /* zero-fill the rest of the last buffer so far */
2047	    for (i = StreamCnt(nst); i < StreamSize(nst); ++i)
2048		StreamBuf(nst)[i] = 0;
2049	    StreamCnt(nst) = BufHeader(StreamBuf(nst))->cnt = StreamSize(nst);
2050	    StreamBuf(nst)[StreamCnt(nst)] = EOB_MARK;
2051
2052	    /* append and zero-fill fresh buffers as necessary */
2053	    while(pos >= StreamOffset(nst) + StreamSize(nst))
2054	    {
2055		Append_New_Buffer(nst)
2056		StreamOffset(nst) += StreamCnt(nst);
2057		Advance_Buffer(nst)
2058		if (pos < StreamOffset(nst) + StreamSize(nst))
2059		    break;
2060		for (i = 0; i < StreamSize(nst); ++i)
2061		    StreamBuf(nst)[i] = 0;
2062		StreamCnt(nst) = BufHeader(StreamBuf(nst))->cnt = StreamSize(nst);
2063		StreamBuf(nst)[StreamCnt(nst)] = EOB_MARK;
2064	    }
2065
2066	    /* zero-fill new last buffer as far as necessary */
2067	    for (i = StreamCnt(nst); i < pos - StreamOffset(nst); ++i)
2068		StreamBuf(nst)[i] = 0;
2069	    StreamCnt(nst) = BufHeader(StreamBuf(nst))->cnt = pos - StreamOffset(nst);
2070	    StreamBuf(nst)[StreamCnt(nst)] = EOB_MARK;
2071
2072	    StreamPtr(nst) = StreamBuf(nst) + StreamCnt(nst);
2073	}
2074    }
2075    StreamMode(nst) &= ~MEOF;
2076    return PSUCCEED;
2077}
2078
2079static int
2080_file_seek(stream_id nst, long int pos, int whence)
2081{
2082    int		res;
2083    struct_stat	buf;
2084    long	max = -1;
2085    long	at;
2086
2087    if (!IsWriteStream(nst) || whence == LSEEK_END)	/* we need the length */
2088    {
2089	if (fstat(StreamUnit(nst), &buf) == 0)
2090	    max = buf.st_size;
2091	else
2092	{
2093	    Set_Errno;
2094	    Bip_Error(SYS_ERROR)
2095	}
2096    }
2097    if (whence == LSEEK_END)
2098    {
2099	pos = max;
2100    }
2101    else if (pos < 0 || max > 0 && pos > max)
2102    {
2103	Bip_Error(RANGE_ERROR)
2104    }
2105
2106    if (IsSocket(nst))
2107	nst = SocketInputStream(nst);
2108    at = StreamOffset(nst);
2109    /*
2110     * If the pointer stays inside an input buffer, we do not have
2111     * to seek in the real file.
2112     */
2113    if (StreamMode(nst) & MREAD && pos >= at && pos <= at + StreamCnt(nst))
2114    {
2115	StreamPtr(nst) = StreamBuf(nst) + pos - at;
2116    }
2117    else
2118    {
2119	if (!IsOpened(nst))
2120	{
2121	    Bip_Error(STREAM_MODE);
2122	}
2123	if (IsWriteStream(nst))
2124	    ec_flush(nst);
2125	at = lseek(StreamUnit(nst), pos, LSEEK_SET);
2126	StreamMode(nst) &= ~(MREAD | MWRITE);
2127	StreamPtr(nst) = StreamBuf(nst);
2128	*StreamPtr(nst) = EOB_MARK;
2129	StreamCnt(nst) = 0;
2130	StreamOffset(nst) = pos;
2131	if (at != pos)
2132	{
2133	    Set_Errno;
2134	    Bip_Error(SYS_ERROR);
2135	}
2136    }
2137    StreamMode(nst) &= ~MEOF;
2138    Succeed_;
2139}
2140
2141
2142/*
2143 * Allow seeking back to the beginning of current buffer.  We use this
2144 * to get left error context for error messages on non-seekable devices.
2145 */
2146
2147static int
2148_buffer_seek(stream_id nst, long int pos, int whence)
2149{
2150    int		res;
2151    struct_stat	buf;
2152    long	max = -1;
2153    long	buf_offset;
2154
2155    if (IsSocket(nst))
2156	nst = SocketInputStream(nst);
2157
2158    buf_offset = StreamOffset(nst);
2159    if (whence == LSEEK_END ||
2160	pos < buf_offset ||
2161	pos > buf_offset + (StreamPtr(nst)-StreamBuf(nst)))
2162    {
2163	Bip_Error(RANGE_ERROR)
2164    }
2165    StreamPtr(nst) = StreamBuf(nst) + pos - buf_offset;
2166    StreamMode(nst) &= ~MEOF;
2167    Succeed_;
2168}
2169
2170
2171/*
2172 * at (position in stream)
2173 */
2174
2175int
2176ec_stream_at(stream_id nst, long int *pos)
2177{
2178    return StreamMethods(nst).at(nst, pos);
2179}
2180
2181static int
2182_buffer_at(stream_id nst, long int *pos)
2183{
2184    *pos = StreamOffset(nst) + (StreamPtr(nst) - StreamBuf(nst));
2185    Succeed_;
2186}
2187
2188static int
2189_file_at(stream_id nst, long int *pos)
2190{
2191    if (StreamMode(nst) & SAPPEND)
2192    {
2193	struct_stat buf;
2194	if (fstat(StreamUnit(nst), &buf) == 0)
2195	    *pos = buf.st_size;
2196	else
2197	{
2198	    Set_Errno;
2199	    Bip_Error(SYS_ERROR)
2200	}
2201    }
2202    else
2203    {
2204	*pos = StreamOffset(nst) + (StreamPtr(nst) - StreamBuf(nst));
2205    }
2206    Succeed_;
2207}
2208
2209static int
2210_socket_at(stream_id nst, long int *pos)
2211{
2212    nst = SocketInputStream(nst);
2213    *pos = StreamOffset(nst) + (StreamPtr(nst) - StreamBuf(nst));
2214    Succeed_;
2215}
2216
2217static int
2218_dummy_at(stream_id nst, long int *pos)
2219{
2220    *pos = 0;
2221    Succeed_;
2222}
2223
2224/*
2225 * We consider queues as files whose beginning gets cut off on reading.
2226 * I.e. read-queues are always at 0, write-queues are always at the
2227 * position corresponding to the number of bytes currently in the queue.
2228 * (This can be used e.g. to find out how many bytes a write/2 produced)
2229 */
2230
2231static int
2232_queue_at(stream_id nst, long int *pos)
2233{
2234    if (IsWriteStream(nst))	/* write and read/write queues */
2235	*pos = _queue_size(nst);
2236    else
2237	*pos = 0;
2238    Succeed_;
2239}
2240
2241
2242
2243/*
2244 * nonempty - we have readable input in the buffer
2245 */
2246
2247static int
2248_string_nonempty(stream_id nst)
2249{
2250    return StreamPtr(nst) - StreamBuf(nst) < StreamCnt(nst)
2251    	|| StreamBufHeader(nst)->next;
2252}
2253
2254static int
2255_buffer_nonempty(stream_id nst)
2256{
2257    return StreamPtr(nst) - StreamBuf(nst) < StreamCnt(nst);
2258}
2259
2260
2261/*
2262 * at_eof
2263 */
2264
2265static int
2266_always_at_eof(stream_id nst)
2267{
2268    Succeed_;
2269}
2270
2271static int
2272_tty_at_eof(stream_id nst)
2273{
2274    Succeed_If(!_buffer_nonempty(nst));
2275}
2276
2277static int
2278_queue_at_eof(stream_id nst)
2279{
2280    Succeed_If(_queue_size(nst) == 0);
2281}
2282
2283static int
2284_string_at_eof(stream_id nst)
2285{
2286    Succeed_If(!_string_nonempty(nst));
2287}
2288
2289static int
2290_buffer_at_eof(stream_id nst)
2291{
2292    struct_stat buf;
2293    long	offset;
2294
2295    if (StreamMode(nst) & SAPPEND)
2296    {
2297	Succeed_;
2298    }
2299
2300#if defined(HAVE_FSTAT)
2301    if(fstat(StreamUnit(nst), &buf) < 0)
2302#else
2303    if(ec_stat(DidName(StreamPath(nst)), &buf) < 0)
2304#endif
2305    {
2306	Set_Errno
2307	Bip_Error(SYS_ERROR)
2308    }
2309    offset = StreamPtr(nst) - StreamBuf(nst);
2310#ifndef _WIN32
2311    /* check the fd directly, our own stream flags are not reliable: */
2312    if (S_ISSOCK(buf.st_mode) || S_ISFIFO(buf.st_mode))
2313    {
2314	Succeed_If((buf.st_size == 0 && offset == StreamCnt(nst)) ||
2315		StreamMode(nst) & MEOF);
2316    }
2317#endif
2318    Succeed_If(StreamOffset(nst) + offset == (unsigned) buf.st_size ||
2319	StreamMode(nst) & MEOF);
2320}
2321
2322
2323
2324/*
2325 * truncate a stream at the current (write) position
2326 * only makes sense on file and string streams
2327 */
2328
2329static int
2330_string_truncate(stream_id nst)
2331{
2332    unsigned char *next = StreamBufHeader(nst)->next;
2333    if (next)
2334    {
2335	do {
2336	    linked_io_buffer_t *this = BufHeader(next);
2337	    next = this->next;
2338	    hg_free((generic_ptr) this);
2339	} while (next);
2340	StreamBufHeader(nst)->next = (unsigned char) 0;
2341    }
2342    StreamCnt(nst) = StreamBufHeader(nst)->cnt = StreamPtr(nst) - StreamBuf(nst);
2343    *StreamPtr(nst) = EOB_MARK;
2344    Succeed_;
2345}
2346
2347
2348static int
2349_file_truncate(stream_id nst)
2350{
2351#ifdef _WIN32
2352    /* On windows, we need to flush in order to set
2353     * the real file pointer to the truncate position
2354     */
2355    int res = _buffer_flush(nst);
2356    Return_If_Error(res);
2357    if (ec_truncate(StreamUnit(nst)))
2358    {
2359	Bip_Error(SYS_ERROR);
2360    }
2361#else
2362    if (ftruncate(StreamUnit(nst),
2363    	(off_t) StreamOffset(nst) + (StreamPtr(nst) - StreamBuf(nst))))
2364    {
2365	Set_Errno
2366	Bip_Error(SYS_ERROR)
2367    }
2368#endif
2369    StreamCnt(nst) = StreamPtr(nst) - StreamBuf(nst);
2370    *StreamPtr(nst) = EOB_MARK;
2371    Succeed_;
2372}
2373
2374
2375
2376/*
2377 * Output to a Prolog stream in the form of printf(). Machines that
2378 * have no vsprintf() (currently VAX and Sequent) must do some hacking here.
2379 * It just simulates sprintf().
2380 */
2381#ifdef STDC_HEADERS
2382#include <stdarg.h>
2383int
2384p_fprintf(stream_id nst, char *fmt, ...)
2385{
2386    va_list	args;
2387    char	ibuf[BUFSIZE];
2388    int		res;
2389
2390    va_start(args, fmt);
2391#ifdef HAVE_VSNPRINTF
2392#ifdef _WIN32
2393    res = _vsnprintf(ibuf, BUFSIZE, fmt, args);
2394#else
2395    res = vsnprintf(ibuf, BUFSIZE, fmt, args);
2396#endif
2397#else
2398    res = vsprintf(ibuf, fmt, args);
2399#endif
2400    va_end(args);
2401    if (res < 0 || res >= BUFSIZE)
2402	res = BUFSIZE;			/* truncate */
2403    return ec_outf(nst, ibuf, res);
2404}
2405#else
2406#ifndef HAVE_VARARGS
2407/*VARARGS2*/
2408p_fprintf(nst, fmt, args)
2409stream_id	nst;
2410char		*fmt;
2411{
2412    FILE dummy;		/* needed for _doprnt */
2413    char ibuf[BUFSIZE];
2414
2415    dummy._cnt = 32767;
2416    dummy._ptr = ibuf;
2417    dummy._base = ibuf;
2418    dummy._flag = _IOWRT+_IOSTRG;
2419    _doprnt(fmt, &args, &dummy);
2420    return ec_outf(nst, ibuf, dummy._ptr - ibuf);
2421}
2422#else
2423#include <varargs.h>
2424/*VARARGS0*/
2425p_fprintf(va_alist)
2426va_dcl
2427{
2428    va_list	args;
2429    stream_id	nst;
2430    char	*fmt;
2431    char	ibuf[BUFSIZE];
2432
2433    va_start(args);
2434    nst = va_arg(args,stream_id);
2435    fmt = va_arg(args,char *);
2436    (void) vsprintf(ibuf,fmt,args);
2437    va_end(args);
2438    return ec_outf(nst, ibuf,strlen(ibuf));
2439}
2440#endif /* VARARGS */
2441#endif /* STDC_HEADERS */
2442
2443
2444/**************** RAW TTY PRIMITIVES ****************/
2445
2446#ifndef _WIN32
2447
2448/*
2449 * FUNCTION NAME:	_set_raw_tty(fd, min, time)
2450 *
2451 * PARAMETERS:		fd	- file descriptor
2452 *
2453 * DESCRIPTION:		Uses an ioctl(2) system call to set the specified
2454 *			file descriptor which must be a terminal into raw mode.
2455 *
2456 */
2457
2458
2459/*ARGSUSED*/
2460static int
2461_set_raw_tty(int fd, int min, int time, struct termios *tbuf)
2462{
2463#ifndef _WIN32
2464    Termio	rawbuf;
2465
2466    if (isatty(fd))
2467    {
2468#ifdef TERMIO_POSIX_STYLE
2469	if (tcgetattr(fd, tbuf) == -1)
2470#else
2471	if (ioctl(fd, GetTermAttr, tbuf) == -1)
2472#endif
2473	{
2474	    Set_Errno;
2475	    return SYS_ERROR;
2476	}
2477	rawbuf = *tbuf;
2478
2479#if defined(TERMIO_POSIX_STYLE) || defined(TERMIO_SYS_V_STYLE)
2480	rawbuf.c_iflag = tbuf->c_iflag & ~(INLCR | ICRNL);
2481	rawbuf.c_oflag = tbuf->c_oflag & ~OPOST;
2482	rawbuf.c_lflag = tbuf->c_lflag &
2483			    ~(ICANON | ECHO | ECHOE | ECHOK | ECHONL);
2484	rawbuf.c_cc[VMIN] = min;
2485	rawbuf.c_cc[VTIME] = time;
2486#endif
2487#ifdef TERMIO_BSD_STYLE
2488	rawbuf.sg_flags = (tbuf->sg_flags | CBREAK) & ~ECHO;
2489#endif
2490
2491#ifdef TERMIO_POSIX_STYLE
2492	if (tcsetattr(fd, TCSANOW, &rawbuf) == -1)
2493#else
2494	if (ioctl(fd, SetTermAttr, &rawbuf) == -1)
2495#endif
2496	{
2497	    Set_Errno;
2498	    return SYS_ERROR;
2499	}
2500    }
2501#endif
2502    return PSUCCEED;
2503}
2504
2505
2506/*
2507 * FUNCTION NAME:	_unset_raw_tty(fd)
2508 *
2509 * PARAMETERS:		fd	- file descriptor
2510 *
2511 * DESCRIPTION:		Uses an ioctl(2) system call to set the specified
2512 *			file descriptor which must be a terminal back
2513 *			into normal mode. Since a global variable is used
2514 *			to store the normal mode, this function is not
2515 *			safe for interrupts, but only if you use
2516 *			raw i/o for two terminals with different setting,
2517 *			which is rather rare.
2518 */
2519
2520static int
2521_unset_raw_tty(int fd, struct termios *tbuf)
2522{
2523#ifndef _WIN32
2524    if (isatty(fd))
2525    {
2526#ifdef TERMIO_POSIX_STYLE
2527	if (tcsetattr(fd, TCSANOW, tbuf) == -1)
2528#else
2529	if (ioctl(fd, SetTermAttr, tbuf) == -1)
2530#endif
2531	{
2532	    Set_Errno;
2533	    return SYS_ERROR;
2534	}
2535    }
2536#endif
2537    return PSUCCEED;
2538}
2539
2540#endif
2541
2542/*
2543 * FUNCTION NAME:	ec_tty_in(nst)
2544 *
2545 * PARAMETERS:		nst -	input stream
2546 *
2547 * DESCRIPTION:
2548 *
2549 * Raw input from a stream. Since we switch between cooked and raw
2550 * mode to execute this function, all unread data will be discarded.
2551 * This is a bug in the whole concept.
2552 */
2553int
2554ec_tty_in(stream_id nst)
2555{
2556    if (!IsReadStream(nst))
2557	return(STREAM_MODE);
2558    if (!IsTty(nst))
2559    {
2560	/* suppress prompting during raw input */
2561	int saved_flag = StreamMode(nst) & DONT_PROMPT;
2562	int res;
2563	StreamMode(nst) |= DONT_PROMPT;
2564	res = ec_getch(nst);
2565	StreamMode(nst) = (StreamMode(nst) & ~DONT_PROMPT) | saved_flag;
2566	return res;
2567    }
2568    return RemoteStream(nst) ? io_rpc(nst, IO_TTYIN) : _local_tty_in(nst);
2569}
2570
2571/* Auxiliary function which has to be executed on the process
2572 * that owns the file descriptor (possibly via rpc).
2573 */
2574static int
2575_local_tty_in(stream_id nst)
2576{
2577#ifdef _WIN32
2578    return ec_getch_raw(StreamUnit(nst));
2579#else
2580    int			n;
2581    char		c;
2582    int			res;
2583    Termio		tbuf;
2584#if defined(TERMIO_POSIX_STYLE) || defined(TERMIO_SYS_V_STYLE)
2585    Termio		rawbuf;
2586    char		buf[10];
2587#endif
2588
2589    Check_Stream_Owner(nst);
2590    /* The loop is to allow signals being handled during the read */
2591    /* We handle only after resetting raw mode, in case the handler longjmps */
2592    for (;;)
2593    {
2594	Disable_Int()
2595	errno = 0;
2596	if ((res = _set_raw_tty(StreamUnit(nst), 1, 0, &tbuf)) != PSUCCEED)
2597	{
2598	    Enable_Int()
2599	    return res;
2600	}
2601	if (E_read_hook != NULL)
2602	{
2603	    while (!(*E_read_hook)(StreamUnit(nst)))
2604	    {}
2605	}
2606#if defined(TERMIO_POSIX_STYLE) || defined(TERMIO_SYS_V_STYLE)
2607	n = read(StreamUnit(nst), &c, 1);
2608	if (n == 1 && c == 27)	/* escape read */
2609	{
2610	    (void) _set_raw_tty(StreamUnit(nst), 0, 2, &rawbuf);
2611	    n = read(StreamUnit(nst), &buf[0], 2);
2612	    if (n == 2 && buf[0] == '[')
2613		c = buf[1];
2614	    else
2615		n = 1;
2616	}
2617#endif
2618#ifdef TERMIO_BSD_STYLE
2619	n = 1 << StreamUnit(nst);
2620	n = select(StreamUnit(nst) + 1,(fd_set *) &n,
2621		(fd_set *) 0,(fd_set *) 0, (struct timeval *) 0);
2622	if (n == 1)
2623	    n = read(StreamUnit(nst), &c, 1);
2624#endif
2625	if (errno != EINTR)
2626	    break;
2627	res = _unset_raw_tty(StreamUnit(nst), &tbuf);
2628	Enable_Int()
2629	if (res != PSUCCEED)
2630	    return res;
2631    }
2632    res = _unset_raw_tty(StreamUnit(nst), &tbuf);
2633    Enable_Int()
2634    if (res != PSUCCEED)
2635	return res;
2636    if(n < 1)
2637    {
2638	Set_Errno
2639	return(SYS_ERROR);
2640    }
2641    return (int) c;
2642#endif
2643}
2644
2645int
2646ec_tty_outs(stream_id nst, char *s, int n)
2647{
2648    if(!(IsWriteStream(nst)))
2649	return(STREAM_MODE);
2650    if(!(IsTty(nst)))
2651	return(ec_outf(nst,s,n));
2652
2653    {
2654#ifdef _WIN32
2655	for(; n>0; --n)
2656	    (void) ec_putch_raw(*s++);
2657	return PSUCCEED;
2658#else
2659	int res;
2660	Termio	tbuf;
2661
2662	Disable_Int()
2663	if ((res = _set_raw_tty(StreamUnit(nst), 1, 0, &tbuf)) != PSUCCEED)
2664	{
2665	    Enable_Int()
2666	    return res;
2667	}
2668	res = StreamMethods(nst).write(StreamUnit(nst), s, n);
2669	res = _unset_raw_tty(StreamUnit(nst), &tbuf);
2670	Enable_Int();
2671	return(res);
2672#endif
2673    }
2674}
2675
2676int
2677ec_tty_out(stream_id nst, int c)
2678{
2679    if(!(IsWriteStream(nst)))
2680	return(STREAM_MODE);
2681    if(!(IsTty(nst)))
2682	return(ec_outfc(nst,c));
2683
2684    {
2685#ifdef _WIN32
2686	(void) ec_putch_raw(c);
2687	return PSUCCEED;
2688#else
2689	char s = c;
2690	return ec_tty_outs(nst, &s, 1);
2691#endif
2692    }
2693}
2694
2695
2696/*
2697 * A general routine which changes the setting of a symbolic stream.
2698 * It checks for the system-defined streams and updates them correspondingly.
2699 * The first argument must be the DID of the symbolic name.
2700 */
2701int
2702set_stream(dident name, stream_id neww)
2703{
2704    value	vv1;
2705    stream_id	old;
2706    int		res;
2707
2708    vv1.did = name;
2709    old = get_stream_id(vv1, tdict, 0, &res);
2710
2711    if(!(IsOpened(neww)))
2712	return(STREAM_SPEC);
2713    if (old == neww)
2714	return PSUCCEED;			/* nothing to do */
2715
2716    /* Check the mode of the new channel for special streams */
2717
2718    if(name == d_.user || name == d_.null)
2719	return(SYSTEM_STREAM);
2720    else if(name == d_.input)
2721    {
2722	if(!IsReadStream(neww))
2723	    return(STREAM_MODE);
2724	current_input_ = neww;
2725    }
2726    else if(name == d_.output)
2727    {
2728	if(!IsWriteStream(neww))
2729	    return(STREAM_MODE);
2730	current_output_ = neww;
2731    }
2732    else if(name == d_.err)
2733    {
2734	if(!IsWriteStream(neww))
2735	    return(STREAM_MODE);
2736	current_err_ = neww;
2737    }
2738    else if(name == d_.warning_output)
2739    {
2740	if(!IsWriteStream(neww))
2741	    return(STREAM_MODE);
2742	warning_output_ = neww;
2743    }
2744    else if(name == d_.log_output)
2745    {
2746	if(!IsWriteStream(neww))
2747	    return(STREAM_MODE);
2748	log_output_ = neww;
2749    }
2750    else if(name == d_.user_input)
2751    {
2752	if(!IsReadStream(neww))
2753	    return(STREAM_MODE);
2754	user_input_ = neww;
2755    }
2756    else if(name == d_.user_output)
2757    {
2758	if(!IsWriteStream(neww))
2759	    return(STREAM_MODE);
2760	user_output_ = neww;
2761    }
2762    else if(name == d_.user_error)
2763    {
2764	if(!IsWriteStream(neww))
2765	    return(STREAM_MODE);
2766	user_err_ = neww;
2767    }
2768    /*
2769     * stdin,stdout,stderr should not be changeable, but there are cases
2770     * where we want to do it once, e.g. when starting a remote server.
2771     * Undocumented: allow it when user_xxx has been set beforehand.
2772     */
2773    else if(name == d_.stdin0)
2774    {
2775	if(neww != user_input_)
2776	    return(SYSTEM_STREAM);
2777	if(!IsReadStream(neww))
2778	    return(STREAM_MODE);
2779	StreamMode(neww) |= SSYSTEM;
2780    }
2781    else if(name == d_.stdout0 || name == d_.stderr0)
2782    {
2783	if(neww != (name==d_.stdout0? user_output_ : user_err_))
2784	    return(SYSTEM_STREAM);
2785	if(!IsWriteStream(neww))
2786	    return(STREAM_MODE);
2787	StreamMode(neww) |= SSYSTEM;
2788    }
2789    /* And now change the stream */
2790    Set_Stream(name, neww);
2791    return PSUCCEED;
2792}
2793
2794#if defined(HAVE_PUSHBACK) && !defined(HAVE_READLINE)
2795pushback_char(int fd, char *p)
2796{
2797    (void) ioctl(fd, TIOCSTI, p);
2798}
2799#endif
2800
2801
2802int
2803ec_is_sigio_stream(stream_id nst, int sock_dir)
2804{
2805    if (IsSocket(nst)  &&  IsWriteStream(nst)  &&  sock_dir == SREAD)
2806	nst = SocketInputStream(nst);	/* Handle the read direction */
2807    return (StreamMode(nst) & SSIGIO);
2808}
2809
2810
2811int
2812ec_stream_set_sigio(stream_id nst, int sock_dir)
2813{
2814    if (IsSocket(nst)  &&  IsWriteStream(nst)  &&  sock_dir == SREAD)
2815	nst = SocketInputStream(nst);	/* Handle the read direction */
2816    if (!(StreamMode(nst) & SSIGIO))
2817    {
2818#ifndef _WIN32
2819	int res = set_sigio(StreamUnit(nst));
2820#else
2821	int res = ec_setup_stream_sigio_thread(nst);
2822#endif
2823	Return_If_Error(res);
2824	StreamMode(nst) |= SSIGIO;
2825    }
2826    return PSUCCEED;
2827}
2828
2829
2830int
2831ec_stream_reset_sigio(stream_id nst, int sock_dir)
2832{
2833    if (IsSocket(nst)  &&  IsWriteStream(nst)  &&  sock_dir == SREAD)
2834	nst = SocketInputStream(nst);	/* Handle the read direction */
2835    if (StreamMode(nst) & SSIGIO)
2836    {
2837#ifndef _WIN32
2838	int res = reset_sigio(StreamUnit(nst));
2839	Return_If_Error(res);
2840#endif
2841	StreamMode(nst) &= ~SSIGIO;
2842    }
2843    return PSUCCEED;
2844}
2845
2846
2847int
2848set_sigio(int fd)
2849{
2850#ifdef SIGIO_FASYNC
2851    int		i;
2852    if (fcntl(fd, F_SETOWN, getpid()) == -1 ||
2853	(i = fcntl(fd, F_GETFL, 0)) == -1)
2854    {
2855	Set_Errno;
2856	return SYS_ERROR;
2857    }
2858    /* FASYNC enables signaling the pgrp when data ready */
2859    if (fcntl(fd, F_SETFL, i | FASYNC) == -1) {
2860	Set_Errno;
2861	return SYS_ERROR;
2862    }
2863#endif
2864#ifdef SIGIO_SETSIG
2865    /* see manual streamio(7) */
2866    if (ioctl(fd, I_SETSIG, S_RDNORM|S_RDBAND|S_HIPRI|S_BANDURG) == -1) {
2867	Set_Errno;
2868	return SYS_ERROR;
2869    }
2870#endif
2871#ifdef SIGIO_FIOASYNC
2872    int		on = 1;
2873    int		pid = (int) getpid();
2874
2875    /* set the process receiving SIGIO/SIGURG signals to us */
2876    if (ioctl(fd, SIOCSPGRP, &pid) == -1)
2877    {
2878	Set_Errno;
2879	return SYS_ERROR;
2880    }
2881    /* allow receipt of asynchronous I/O signals */
2882    if (ioctl(fd, FIOASYNC, &on) == -1)
2883    {
2884	Set_Errno;
2885	return SYS_ERROR;
2886    }
2887#endif
2888    return PSUCCEED;
2889}
2890
2891int
2892reset_sigio(int fd)
2893{
2894#ifdef SIGIO_FASYNC
2895    int		i;
2896
2897    if ((i = fcntl(fd, F_GETFL, 0)) == -1)
2898    {
2899	Set_Errno;
2900	return SYS_ERROR;
2901    }
2902
2903    if (fcntl(fd, F_SETFL, i & ~FASYNC) == -1) {
2904	Set_Errno;
2905	return SYS_ERROR;
2906    }
2907#endif
2908#ifdef SIGIO_SETSIG
2909    if (ioctl(fd, I_SETSIG, 0) == -1) {
2910	Set_Errno;
2911	return SYS_ERROR;
2912    }
2913#endif
2914#ifdef SIGIO_FIOASYNC
2915    int		off = 0;
2916
2917    /* allow receipt of asynchronous I/O signals */
2918    if (ioctl(fd, FIOASYNC, &off) == -1)
2919    {
2920	Set_Errno;
2921	return SYS_ERROR;
2922    }
2923#endif
2924    return PSUCCEED;
2925}
2926
2927static int
2928_isafifo(int fd)
2929{
2930#if !defined(_WIN32) || defined(S_IFIFO)
2931    struct_stat st;
2932
2933    return
2934	fd != NO_UNIT &&
2935	fstat(fd, &st) != -1 &&
2936	( (st.st_mode & S_IFMT) == S_IFIFO
2937#if defined(SOCKETS) && defined(S_IFSOCK)
2938	|| (st.st_mode & S_IFMT) == S_IFSOCK
2939#endif
2940	);
2941#else
2942    return 0;
2943#endif
2944}
2945
2946
2947/*
2948 * Dispatch function for all I/O related RPCs
2949 */
2950int
2951do_io_action(stream_id nst, int action)
2952{
2953    switch (action)
2954    {
2955    case IO_FLUSH:
2956	return _local_io_flush_out(nst);
2957    case IO_FILL:
2958	return _local_fill_buffer(nst);
2959    case IO_CLOSE:
2960	return _local_io_close(nst);
2961    case IO_TTYIN:
2962	return _local_tty_in(nst);
2963    case IO_BIND:
2964    case IO_CONNECT:
2965    case IO_LISTEN:
2966    case IO_ACCEPT:
2967	return STREAM_MODE;	/* not yet possible */
2968    }
2969    return MPS_ERROR;
2970}
2971
2972
2973/*
2974 * New I/O OS layer
2975 */
2976
2977static int
2978_dummy_close(int fd)
2979{
2980    return PSUCCEED;
2981}
2982
2983static int
2984_dummy_io(int fd, char *buf, int n)
2985{
2986    return PSUCCEED;
2987}
2988
2989static int
2990_dummy_size(stream_id nst)
2991{
2992    return 0;
2993}
2994
2995static int
2996_dummy_content(stream_id nst, char *buf)
2997{
2998    return 0;
2999}
3000
3001
3002static int
3003_close_fd(int fd)
3004{
3005    if (close(fd) < 0)
3006    {
3007	Set_Errno;
3008	return SYS_ERROR;
3009    }
3010    return PSUCCEED;
3011}
3012
3013/*
3014 * The basic output primitive which uses the write(2) system call to output
3015 * the buffer or raw data. If it has been interrupted by an interrupt,
3016 * the system call is restarted.
3017 */
3018static int
3019_write_fd(int fd, char *buf, int n)
3020{
3021    int		cnt = 0;
3022
3023    for (;;)
3024    {
3025	cnt = write(fd, buf, n);
3026	if (cnt == n)
3027	    return PSUCCEED;
3028	else if (cnt < 0 )
3029	{
3030#ifdef EINTR
3031	    if (errno == EINTR)
3032		continue;	/* an interrupted call, try again */
3033#endif
3034	    Set_Errno
3035	    return OUT_ERROR;
3036	}
3037	else
3038	{
3039	    n -= cnt;
3040	    buf += cnt;
3041	}
3042    }
3043}
3044
3045static int
3046_read_fd(int fd, char *buf, int n)
3047{
3048    int count;
3049
3050    for (;;)
3051    {
3052	count = read(fd, buf, n);
3053	if (count < 0)
3054	{
3055#ifdef EINTR
3056	    if (errno == EINTR)
3057		continue;	/* an interrupted call, try again */
3058#endif
3059	    Set_Errno
3060	}
3061	return count;
3062    }
3063}
3064
3065io_channel_t	ec_file = {
3066    SFILE,		/*io_type*/
3067#ifdef _WIN32
3068    SREPOSITION|SCOMPRESS|SEOLCR,	/*mode_defaults*/
3069#else
3070    SREPOSITION|SCOMPRESS|SSELECTABLE,	/*mode_defaults*/
3071#endif
3072    BUFSIZE,		/*buf_size_hint*/
3073    _close_fd,		/*close*/
3074    _dummy_io,		/*ready*/
3075    _read_fd,		/*read*/
3076    _write_fd,		/*write*/
3077    _file_at,		/*at*/
3078    _buffer_at_eof,	/*at_eof*/
3079    _buffer_nonempty,	/*buffer_nonempty*/
3080    _file_truncate,	/*truncate*/
3081    _file_seek,		/*seek*/
3082    _buffer_flush,	/*flush*/
3083    _dummy_size,	/*size*/
3084    _dummy_content,	/*content*/
3085    _buffer_write	/*outf*/
3086};
3087
3088io_channel_t	ec_pipe = {
3089    SPIPE,		/*io_type*/
3090#ifdef _WIN32
3091    SEOF_RESET|SCOMPRESS|SEOLCR,		/*mode_defaults*/
3092#else
3093    SEOF_RESET|SCOMPRESS|SSELECTABLE,	/*mode_defaults*/
3094#endif
3095    BUFSIZE,		/*buf_size_hint*/
3096    _close_fd,		/*close*/
3097    _dummy_io,		/*ready*/
3098    _read_fd,		/*read*/
3099    _write_fd,		/*write*/
3100    _buffer_at,		/*at*/
3101    _buffer_at_eof,	/*at_eof*/
3102    _buffer_nonempty,	/*buffer_nonempty*/
3103    _dummy_io,		/*truncate*/
3104    _buffer_seek,	/*seek*/
3105    _buffer_flush,	/*flush*/
3106    _dummy_size,	/*size*/
3107    _dummy_content,	/*content*/
3108    _buffer_write	/*outf*/
3109};
3110
3111io_channel_t	ec_tty = {
3112    STTY,		/*io_type*/
3113#ifdef _WIN32
3114    SEOF_RESET|SFLUSHEOL|SEOLCR,		/*mode_defaults*/
3115#else
3116    SEOF_RESET|SFLUSHEOL|SSELECTABLE,	/*mode_defaults*/
3117#endif
3118    TTY_BUF_SIZE,	/*buf_size_hint*/
3119    _close_fd,		/*close*/
3120    _dummy_io,		/*ready*/
3121    _read_fd,		/*read*/
3122    _write_fd,		/*write*/
3123    _buffer_at,		/*at*/
3124    _buffer_nonempty,	/*buffer_nonempty*/
3125    _tty_at_eof,	/*at_eof*/
3126    _dummy_io,		/*truncate*/
3127    _buffer_seek,	/*seek*/
3128    _buffer_flush,	/*flush*/
3129    _dummy_size,	/*size*/
3130    _dummy_content,	/*content*/
3131    _tty_write		/*outf*/
3132};
3133
3134io_channel_t	ec_null_stream = {
3135    SNULL,		/*io_type*/
3136    SEOF_RESET|SREPOSITION|SSELECTABLE,	/*mode_defaults*/
3137    0,			/*buf_size_hint*/
3138    _dummy_close,	/*close*/
3139    _dummy_io,		/*ready*/
3140    _dummy_io,		/*read*/
3141    _dummy_io,		/*write*/
3142    _dummy_at,		/*at*/
3143    _always_at_eof,	/*at_eof*/
3144    _dummy_size,	/*buffer_nonempty*/
3145    _dummy_io,		/*truncate*/
3146    _dummy_seek,	/*seek*/
3147    _dummy_flush,	/*flush*/
3148    _dummy_size,	/*size*/
3149    _dummy_content,	/*content*/
3150    _null_write		/*outf*/
3151};
3152
3153extern int ec_write_socket(int, char *, int);
3154extern int ec_read_socket(int, char *, int);
3155extern int ec_close_socket(int);
3156
3157io_channel_t	ec_socket = {
3158    SSOCKET,		/*io_type*/
3159#ifdef _WIN32
3160    SEOF_RESET|SEOLCR|SCOMPRESS|SSELECTABLE,	/*mode_defaults*/
3161#else
3162    SEOF_RESET|SCOMPRESS|SSELECTABLE,		/*mode_defaults*/
3163#endif
3164    BUFSIZE,		/*buf_size_hint*/
3165    ec_close_socket,	/*close*/
3166    _dummy_io,		/*ready*/
3167    ec_read_socket,	/*read*/
3168    ec_write_socket,	/*write*/
3169    _socket_at,		/*at*/
3170    _buffer_at_eof,	/*at_eof*/
3171    _buffer_nonempty,	/*buffer_nonempty*/
3172    _dummy_io,		/*truncate*/
3173    _buffer_seek,	/*seek*/
3174    _buffer_flush,	/*flush*/
3175    _dummy_size,	/*size*/
3176    _dummy_content,	/*content*/
3177    _buffer_write	/*outf*/
3178};
3179
3180io_channel_t	ec_string_stream = {
3181    SSTRING,		/*io_type*/
3182    MREAD|SREPOSITION|SSELECTABLE,	/*mode_defaults*/
3183    1024,		/*buf_size_hint*/
3184    _dummy_close,	/*close*/
3185    _dummy_io,		/*ready*/
3186    _dummy_io,		/*read*/
3187    _dummy_io,		/*write*/
3188    _buffer_at,		/*at*/
3189    _string_at_eof,	/*at_eof*/
3190    _string_nonempty,	/*buffer_nonempty*/
3191    _string_truncate,	/*truncate*/
3192    _string_seek,	/*seek*/
3193    _dummy_flush,	/*flush*/
3194    _string_size,	/*size*/
3195    _string_content,	/*content*/
3196    _string_write	/*outf*/
3197};
3198
3199io_channel_t	ec_queue_stream = {
3200    SQUEUE,		/*io_type*/
3201    SEOF_RESET|MREAD|SSELECTABLE,	/*mode_defaults*/
3202    1024,		/*buf_size_hint*/
3203    _dummy_close,	/*close*/
3204    _dummy_io,		/*ready*/
3205    _dummy_io,		/*read*/
3206    _dummy_io,		/*write*/
3207    _queue_at,		/*at*/
3208    _queue_at_eof,	/*at_eof*/
3209    _queue_size,	/*buffer_nonempty*/
3210    _dummy_io,		/*truncate*/
3211    _illegal_seek,	/*seek*/
3212    _queue_flush,	/*flush*/
3213    _queue_size,	/*size*/
3214    _queue_content,	/*content*/
3215    _queue_write	/*outf*/
3216};
3217
3218