1(*
2    Title:      Standard Basis Library: StreamIO functor
3    Copyright   David C.J. Matthews 2000, 2005
4
5    This library is free software; you can redistribute it and/or
6    modify it under the terms of the GNU Lesser General Public
7    License as published by the Free Software Foundation; either
8    version 2.1 of the License, or (at your option) any later version.
9    
10    This library is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    Lesser General Public License for more details.
14    
15    You should have received a copy of the GNU Lesser General Public
16    License along with this library; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18*)
19
20functor BasicStreamIO(
21    structure PrimIO : PRIM_IO
22    structure Vector : MONO_VECTOR
23    structure Array : MONO_ARRAY
24    structure VectorSlice: MONO_VECTOR_SLICE
25    structure ArraySlice: MONO_ARRAY_SLICE
26    sharing type PrimIO.elem = Vector.elem = Array.elem = VectorSlice.elem = ArraySlice.elem
27    sharing type PrimIO.vector = Vector.vector = Array.vector = VectorSlice.vector = ArraySlice.vector
28    sharing type PrimIO.array = Array.array = ArraySlice.array
29    sharing type PrimIO.vector_slice = VectorSlice.slice = ArraySlice.vector_slice
30    sharing type PrimIO.array_slice = ArraySlice.slice
31    val someElem : PrimIO.elem
32    ):
33    sig
34    include STREAM_IO
35    (* Note: This is non-standard but enables us to define
36       the derived BinIO and TextIO structures more efficiently. *)
37    val outputVec: outstream * PrimIO.vector_slice -> unit
38    end =
39struct
40    open IO
41    type vector = Vector.vector
42    type elem = PrimIO.elem
43    datatype reader = datatype PrimIO.reader
44    datatype writer = datatype PrimIO.writer
45    type array = Array.array
46    type pos = PrimIO.pos
47
48    exception Interrupt = RunCall.Interrupt
49
50    (* Called after any exception in the lower level reader or
51       writer to map any exception other than Io into Io. *)
52    fun mapToIo (io as Io _, _, _) = io
53      | mapToIo (Interrupt, _, _) = Interrupt
54      | mapToIo (nonIo, name, caller) =
55            Io { name = name, function = caller, cause = nonIo }
56
57    val emptyVec = Vector.fromList [] (* Represents end-of-stream. *)
58
59    datatype instream =
60        (* The usual state of a stream: We may have to read from the reader
61           before we have any real data or we may have already read. *)
62        Uncommitted of { state: streamState ref,
63                         locker: Thread.Mutex.mutex }
64        (* If we know we have unread input we can return this as the
65           stream.  That allows part of the stream to be read without
66           locking.  This is an optimisation. *)
67      | Committed of
68           { vec: vector, offset: int, rest: instream, startPos: pos option }
69
70    and streamState =
71        Truncated (* The stream has been closed or truncated. *)
72    |   HaveRead of (* A vector has been read from the stream.  If the
73                       vector has size zero this is treated as EOF.
74                       startPos is the position when the vector was
75                       read. *)
76            {vec: vector, rest: streamState ref, startPos: pos option }
77    |   ToRead of reader (* We have not yet closed or truncated the stream. *)
78
79   
80    and outstream =
81        OutStream of {
82            wrtr: writer,
83            buffType : IO.buffer_mode ref,
84            buf: array,
85            bufp: int ref,
86            isTerm: bool ref,
87            locker: Thread.Mutex.mutex
88            }
89
90    datatype out_pos = OutPos of outstream * pos
91
92    (* Create a new stream from the vector and the reader. *)
93    fun mkInstream (r, v: vector): instream =
94    let
95        val readRest =
96           Uncommitted { state = ref (ToRead r), locker = Thread.Mutex.mutex() }
97        (* If the vector is non-empty the first entry is as though the
98           vector had been read otherwise it's just the reader. *)
99    in
100        if Vector.length v = 0
101        then readRest
102        else Committed { vec = v, offset = 0, rest = readRest, startPos = NONE }
103    end
104
105    local
106        fun input' (ref (HaveRead {vec, rest, ...}), locker) =
107            let
108                (* TODO: If we have already read further on we could convert
109                   these entries to Committed. *)
110            in
111                (vec, Uncommitted{ state = rest, locker = locker })
112            end
113
114          | input' (s as ref Truncated, locker) = (* Truncated: return end-of-stream *)
115                   (emptyVec, Uncommitted{ state = s, locker = locker })
116
117          | input' (state as
118                       ref(readMore as ToRead (RD {chunkSize, readVec = SOME readVec, getPos, ...})),
119                       locker) =
120            let
121                (* We've not yet read this.  Try reading from the reader. *)
122                val startPos =
123                   case getPos of SOME g => SOME(g()) | NONE => NONE
124                val data = readVec chunkSize
125                (* Create a reference to the reader which will be updated by
126                   the next read.  The ref is shared between the existing stream
127                   and the new one so reading on either adds to the same chain. *)
128                val nextLink = ref readMore
129                val nextChunk =
130                    HaveRead {vec = data, rest = nextLink, startPos = startPos}
131            in
132                (* Extend the stream by adding this vector to the list of chunks read so far. *)
133                state := nextChunk;
134                (* Return a new stream which continues reading. *)
135                (data, Uncommitted { state = nextLink, locker = locker })
136            end
137
138          | input' (ref(ToRead(RD{name, ...})), _) =
139                (* readVec missing in reader. *)
140                raise Io { name = name, function = "input", cause = BlockingNotSupported }
141
142         fun inputNList' (ref (HaveRead {vec, rest, startPos}), locker, n) =
143            let
144                val vecLength = Vector.length vec
145            in
146                if vecLength = 0 (* End-of-stream: Return next in list. *)
147                then ([vec], Uncommitted{ state = rest, locker = locker })
148                else if n < vecLength
149                then (* We can use what's already been read.  The stream we return allows us
150                        to read the rest without blocking. *)
151                    ([VectorSlice.vector(VectorSlice.slice(vec, 0, SOME n))],
152                     Committed{ vec = vec, offset = n, startPos = startPos,
153                         rest = Uncommitted{ state = rest, locker = locker}  })
154                else if n = vecLength
155                then (* Exactly uses up the buffer.  New stream state is the next entry. *)
156                    ([vec], Uncommitted{ state = rest, locker = locker})
157                else (* Have to get the next item *)
158                let
159                    val (nextVecs, nextStream) = inputNList' (rest, locker, n - vecLength)
160                in
161                    (vec :: nextVecs, nextStream)
162                end
163            end
164
165         | inputNList' (s as ref Truncated, locker, _) =
166                (* Truncated: return end-of-stream *)
167                   ([emptyVec], Uncommitted{ state = s, locker = locker })
168   
169         | inputNList' (f, locker, n) = (* ToRead *)
170             let
171                 val (vec, f') = input' (f, locker)
172             in
173                 if Vector.length vec = 0
174                 then ([vec], f') (* Truncated or end-of-file. *)
175                 else inputNList' (f, locker, n) (* Reread *)
176             end
177
178    in
179        fun input (Uncommitted { state, locker }) =
180                LibraryIOSupport.protect locker input' (state, locker)
181
182        |   input (Committed { vec, offset, rest, ... }) =
183              (* This stream was produced from re-reading a stream that already
184                 had data.  We can return the result without the overhead of locking. *)
185                (VectorSlice.vector(VectorSlice.slice(vec, offset, NONE)), rest)
186
187        fun inputNList (Uncommitted { state, locker }, n) =
188                LibraryIOSupport.protect locker inputNList' (state, locker, n)
189
190        |   inputNList (Committed { vec, offset, rest, startPos }, n) =
191            let
192                val vecLength = Vector.length vec
193            in
194                if vecLength = 0 (* End-of-stream: Return next in list. *)
195                then ([vec], rest)
196                else if n < vecLength - offset
197                then (* We can use what's already been read.  Next entry is a committed
198                        stream that returns the part we haven't yet used. *)
199                    ([VectorSlice.vector(VectorSlice.slice(vec, offset, SOME n))],
200                     Committed{ vec = vec, offset = offset+n, rest = rest, startPos = startPos })
201                else if n = vecLength - offset
202                then (* Exactly uses up the buffer.  New stream state is the next entry. *)
203                    ([VectorSlice.vector(VectorSlice.slice(vec, offset, NONE))], rest)
204                else (* Have to get the next item *)
205                let
206                    val (nextVecs, nextStream) = inputNList (rest, n - (vecLength - offset))
207                in
208                    (VectorSlice.vector(VectorSlice.slice(vec, offset, NONE)) :: nextVecs,
209                     nextStream)
210                end
211            end
212
213        fun inputN (f, n) =
214        if n < 0
215        then raise Size
216        else if n = 0 (* Defined to return the empty vector and f *)
217        then (emptyVec, f)
218        else
219        let
220            val (vecs, f') = inputNList (f, n)
221        in
222            (Vector.concat vecs, f')
223        end
224
225        (* Read the whole of the remaining input until we get an EOF. *)
226        fun inputAll f =
227        let
228            (* Find out the size of the file. *)
229            fun getSize(n, f) =
230            let
231                val (v, f') = input f
232                val vSize = Vector.length v
233            in
234                if vSize = 0
235                then n (* Reached EOF. *)
236                else getSize (n + vSize, f')
237            end
238        in
239            (* Read the whole file. *)
240            inputN(f, getSize(0,f))
241        end
242
243        (* Note a crucial difference between inputN and input1.  Because input1
244           does not return a stream if it detects EOF it cannot advance beyond
245           a temporary EOF in a stream. *)
246        fun input1 (Committed { vec, offset, rest, startPos }) =
247            let
248                val vecSize = Vector.length vec
249            in
250                if vecSize = 0
251                then NONE
252                else if vecSize = offset+1
253                then SOME(Vector.sub(vec, offset), rest)
254                else SOME(Vector.sub(vec, offset),
255                       Committed{ vec = vec, offset = offset+1, rest = rest, startPos = startPos })
256            end
257
258        |   input1 f =
259            let
260                val (s, f') = inputN (f, 1)
261            in
262                if Vector.length s = 0
263                then NONE
264                else SOME(Vector.sub(s, 0), f')
265            end
266
267    end
268
269    local
270        fun doClose (ref (HaveRead {rest, ...})) = doClose rest
271          | doClose (ref Truncated) = ()
272          | doClose (state as ref (ToRead (RD{close, ...}))) =
273              (state := Truncated; close())
274    in
275        fun closeIn (Uncommitted { state, locker }) = LibraryIOSupport.protect locker doClose state
276          | closeIn (Committed { rest, ...}) = closeIn rest
277    end
278
279    local
280        (* Return the reader. *) 
281        fun getReader' (ref (HaveRead {rest, ...})) = getReader' rest
282        |   getReader' (ref Truncated) =
283                raise Io { name = "",  function = "getReader", cause = ClosedStream }
284        |   getReader' (state as ref (ToRead reader)) =
285                (state := Truncated; reader)
286    in
287        fun getReader'' (Uncommitted { state, locker }) =
288                LibraryIOSupport.protect locker getReader' state
289        |   getReader'' (Committed { rest, ... }) = getReader'' rest
290
291        fun getReader f =
292        let
293            val reader = getReader'' f
294            val (allInput, _) = inputAll f
295        in
296            (* Return the reader along with buffered input.  It's not clear
297               what to do if there are EOFs in the stream.  The book says the
298               result is the result of inputAll which takes everything up to the
299               first EOF.  *)
300            (reader, allInput)
301        end
302    end
303    
304    local
305        (* Check that the stream is not terminated and then convert a file position
306           plus a vector offset into a file position.  In particular, if the reader
307           has converted CRNL into NL we don't have a simple relationship between
308           elements and file offsets. *)
309        fun findPosition'(startPos, offset, HaveRead {rest=ref rest, ...}) =
310                findPosition'(startPos, offset, rest)
311        |   findPosition'(_, _, Truncated) =
312                raise Io { name = "",  function = "filePosIn", cause = ClosedStream }
313        |   findPosition'(startPos, offset,
314                ToRead (RD { getPos = SOME getPos, setPos = SOME setPos,
315                             readVec = SOME readVec, ...})) =
316                if offset = 0
317                then startPos (* Easy *)
318                else
319                    (* When we read this vector we recorded the file position of
320                       the beginning only.  To find the file position of the
321                       particular element we actually need to read the portion of
322                       the input up to that element and find out the file position
323                       at that point. *)
324                let
325                    val savep = getPos() (* Save current position. *)
326                    (* Move to the point where we read the vector. *)
327                    val () = setPos startPos;
328                    (* Call readVec until we have read the required number
329                       of elements.  N.B.  Ganser & Reppy has a bug here.
330                       There is no guarantee that readVec n will actually
331                       return n elements so it's unsafe to assume that it
332                       will move the file pointer by n elements. *)
333                    fun doRead n =
334                    let
335                        val read = Vector.length(readVec n)
336                    in
337                        if read = n orelse read = 0 (* Error? *)
338                        then ()
339                        else doRead (n - read)
340                    end
341                    (* Read the offset number of elements. *)
342                    val () = doRead offset;
343                    (* Record the position after actually reading the elements. *)
344                    val position = getPos();
345                in
346                    setPos savep; (* Restore. *)
347                    position
348                end
349        |   findPosition'(_, _, ToRead _) =
350                raise Io { name = "",  function = "filePosIn",
351                           cause = RandomAccessNotSupported }
352
353        fun findPosition(startPos, offset, Committed { rest, ... }) =
354                findPosition(startPos, offset, rest)
355        |   findPosition(startPos, offset, Uncommitted { state = ref state, locker }) =
356                LibraryIOSupport.protect locker findPosition' (startPos, offset, state)
357
358        fun filePosIn' (HaveRead {rest=ref rest, startPos = SOME startPos, ...}) =
359                findPosition'(startPos, 0, rest)
360        |   filePosIn' (HaveRead {startPos = NONE, ...}) =
361                raise Io { name = "",  function = "filePosIn",
362                           cause = RandomAccessNotSupported }
363        |   filePosIn' Truncated =
364                raise Io { name = "",  function = "filePosIn", cause = ClosedStream }
365        |   filePosIn' (ToRead(RD { getPos = SOME getPos, ...})) = getPos()
366        |   filePosIn' (ToRead _) =
367                raise Io { name = "",  function = "filePosIn",
368                           cause = RandomAccessNotSupported }
369
370    in
371        (* Find the first entry to get the position. *)
372        fun filePosIn (Uncommitted { state = ref state, locker }) =
373                LibraryIOSupport.protect locker filePosIn' state
374        |   filePosIn (Committed { offset, rest, startPos = SOME startPos, ... }) =
375                findPosition(startPos, offset, rest)
376        |   filePosIn (Committed { startPos = NONE, ... }) =
377              (* This can occur either because the reader doesn't support getPos or
378                 because the position is within the initial vector passed to
379                 mkInstream. *)
380                raise Io { name = "",  function = "filePosIn",
381                           cause = RandomAccessNotSupported }
382    end
383    
384    local
385        fun doCanInput' (ref (HaveRead {vec, rest, ...}), locker, n, k) =
386            let
387                val vecLength = Vector.length vec
388            in
389                if vecLength = 0
390                then SOME k
391                else if vecLength >= n
392                then SOME (k+n)
393                else doCanInput'(rest, locker, n-vecLength, k+vecLength)
394            end
395        
396        |   doCanInput' (ref Truncated, _, _, k) = SOME k
397
398        |   doCanInput' (state as
399                            ref(readMore as ToRead (RD {chunkSize, readVecNB = SOME readVecNB, getPos, ...})),
400                         locker, n, k) =
401            let
402                val startPos =
403                   case getPos of SOME g => SOME(g()) | NONE => NONE
404            in
405               (* Read a block full.  This will avoid us creating lots of small items
406                  in the list if there is actually plenty of input available. *)
407               case readVecNB chunkSize of
408                    NONE => (* Reading these would block but we may already have some input. *)
409                        if k = 0 then NONE else SOME k
410                |   SOME data =>
411                    let (* We have to record this in the stream. *)
412                        val nextLink = ref readMore
413                        val nextChunk =
414                            HaveRead {vec = data, rest = nextLink, startPos = startPos}
415                    in
416                        state := nextChunk;
417                        (* Check whether this has satisfied the request. *)
418                        doCanInput'(state, locker, n, k)
419                    end
420            end
421
422        |   doCanInput' (ref(ToRead(RD {name, ...})), _, _, _) = 
423                (* readVecNB missing in reader. *)
424                raise Io { name = name, function = "canInput", cause = NonblockingNotSupported }
425
426        fun doCanInput (Uncommitted { state, locker }, n, k) =
427                LibraryIOSupport.protect locker doCanInput' (state, locker, n, k)
428        |   doCanInput (Committed { vec, rest, ... }, n, k) =
429            let
430                val vecLength = Vector.length vec
431            in
432                if vecLength = 0
433                then SOME k (* Reached EOF. *)
434                else if vecLength >= n
435                then SOME (k + n) (* Have already read enough. *)
436                else doCanInput(rest, n-vecLength, k+vecLength)
437            end
438    in
439        fun canInput(f, n) = if n < 0 then raise Size else doCanInput(f, n, 0)
440    end
441
442
443    (* Look for end-of-stream. Could be defined more directly
444       but it probably isn't worth it. *)
445    fun endOfStream f =
446    let
447        val (v, _) = input f
448    in
449        Vector.length v = 0
450    end
451
452
453    (* OUTPUT *)
454    (* In order to be able to flush and close the streams when we exit
455       we need to keep a list of the output streams.
456       One unfortunate side-effect of this is that the RTS can't
457       garbage-collect output streams since there will always be
458       a reference to a stream until it is explicitly closed.
459       It could be worth using a weak reference here but that
460       requires either a separate thread or some way of registering
461       a function to be called to check the list.  *)
462    val ostreamLock = Thread.Mutex.mutex()
463    (* Use a no-overwrite ref for the list of streams.  This ensures that
464       the ref will not be overwritten if we load a saved state. *)
465    val outputStreamList: outstream list ref = LibrarySupport.noOverwriteRef nil;
466
467    fun protectOut f (outs as OutStream{locker, ...}) = LibraryIOSupport.protect locker f outs
468
469    fun mkOutstream'(wrtr as WR{chunkSize, ...}, buffMode) =
470    let
471        open Thread.Mutex
472        val strm =
473            OutStream{wrtr=wrtr,
474                      buffType=ref buffMode,
475                      buf=Array.array(chunkSize, someElem),
476                      isTerm=ref false,
477                      bufp=ref 0,
478                      locker=Thread.Mutex.mutex()}
479    in
480        (* Add it to the list. *)
481        outputStreamList := strm :: ! outputStreamList;
482        strm
483    end
484    
485    val mkOutstream = LibraryIOSupport.protect ostreamLock mkOutstream'
486
487    fun getBufferMode(OutStream{buffType=ref b, ...}) = b
488
489    local
490        (* Flush anything from the buffer. *)
491        fun flushOut'(OutStream{buf, bufp=bufp as ref endBuf,
492                               wrtr=wrtr as WR{name, ...}, ...}) =
493            if endBuf = 0 then () (* Nothing buffered *)
494            else case wrtr of
495                WR{writeArr=SOME wa, ...} =>
496                let
497                    fun flushBuff n =
498                    let
499                        val written =
500                            wa(ArraySlice.slice(buf, n, SOME(endBuf-n)))
501                            handle exn => raise mapToIo(exn, name, "flushOut")
502                    in
503                        if written+n = endBuf then ()
504                        else flushBuff(written+n)
505                    end
506                in
507                    (* Set the buffer to empty BEFORE writing anything.  If
508                       we get an asynchronous interrupt (ctrl-C) we want to
509                       lose data in preference to duplicating it. *)
510                    bufp := 0;
511                    flushBuff 0
512                end
513            |   _ =>
514                raise Io { name = name, function = "flushOut",
515                           cause = BlockingNotSupported }
516
517        (* Terminate a stream either because it has been closed or
518           because we have extracted the underlying writer. *)
519        fun terminateStream'(OutStream{isTerm=ref true, ...}) = () (* Nothing to do. *)
520          | terminateStream'(f as OutStream{isTerm, ...}) =
521            let
522                (* outstream is not an equality type but we can get the
523                   desired effect by comparing the isTerm references for
524                   equality (N.B. NOT their contents). *)
525                fun removeThis(OutStream{isTerm=isTerm', ...}) =
526                    isTerm' <> isTerm
527                open Thread.Mutex
528            in
529                isTerm := true;
530                lock ostreamLock;
531                outputStreamList := List.filter removeThis (!outputStreamList);
532                unlock ostreamLock;
533                flushOut' f
534            end;
535      
536        (* Close the stream.  It is safe to repeat this and we may need to close
537           the writer even if the stream is terminated. *)
538        fun closeOut'(f as OutStream{wrtr=WR{close, ...}, ...}) =
539            (
540            terminateStream' f;
541            close() (* Close the underlying writer. *)
542            )
543
544        (* Flush the stream, terminate it and return the underlying writer. *)
545        fun getWriter'(OutStream{wrtr=WR{name, ...}, isTerm=ref true, ...}) =
546            (* Already terminated. *)
547                raise Io { name = name, function = "getWriter",
548                           cause = ClosedStream }
549         |  getWriter'(f as OutStream{buffType, wrtr, ...}) =
550            (
551               terminateStream' f;
552               (wrtr, !buffType)
553            )
554
555        (* Set the buffer mode, possibly flushing the buffer as it does. *)
556        fun setBufferMode' newBuff (f as OutStream{buffType, bufp, ...}) =
557        (* Question: What if the stream is terminated? *)
558            (
559            if newBuff = NO_BUF andalso !bufp <> 0
560            then (* Flush pending output. *)
561                (* Switching from block to line buffering does not flush. *)
562                flushOut' f
563            else ();
564            buffType := newBuff
565            )
566
567        (* Internal function: Write a vector directly to the writer. It only
568           returns when the vector has been completely written.
569           "output" should work if the writer only provides writeArr so we
570           may have to use that if writeVec isn't there. *)
571        (* FOR TESTING: Put writeArr first. *)
572        fun writeVec(OutStream{wrtr=WR{writeVec=SOME wv, name, ...}, ...}, v, i, len) =
573        let
574            fun forceOut p =
575            let
576                val written = wv(VectorSlice.slice(v, p+i, SOME(len-p)))
577                    handle exn => raise mapToIo(exn, name, "output")
578            in
579                if written+p = len then ()
580                else forceOut(written+p)
581            end
582        in
583            forceOut 0
584        end
585        |  writeVec(OutStream{wrtr=WR{writeArr=SOME wa, name, ...}, ...}, v, i, len) =
586        let
587            val buffSize = 10
588            val buff = Array.array(buffSize, someElem);
589            fun forceOut p =
590            let
591                val toCopy = Int.min(len-p, buffSize)
592                val () =
593                   ArraySlice.copyVec{src=VectorSlice.slice(v, p+i, SOME toCopy), dst=buff, di=0}
594                val written = wa(ArraySlice.slice(buff, 0, SOME toCopy))
595                    handle exn => raise mapToIo(exn, name, "output")
596            in
597                if written+p = len then ()
598                else forceOut(written+p)
599            end
600        in
601            forceOut 0
602        end       
603        |   writeVec(OutStream{wrtr=WR{name, ...}, ...}, _, _, _) =
604                raise Io { name = name, function = "output",
605                           cause = BlockingNotSupported }
606    
607        (* Internal function. Write a vector to the stream using the start and
608           length provided. *)
609        fun outputVector _ (OutStream{isTerm=ref true, wrtr=WR{name, ...}, ...}) =
610            raise Io { name = name, function = "output", cause = ClosedStream }
611        |   outputVector (v, start, vecLen) (f as OutStream{buffType, buf, bufp, ...})  =
612        let
613            val buffLen = Array.length buf
614
615            fun arrayCopyVec{src: Vector.vector, si: int, len: int, dst: Array.array, di: int} =
616                ArraySlice.copyVec{src=VectorSlice.slice(src, si, SOME len), dst=dst, di=di};
617   
618            fun addVecToBuff () =
619                if vecLen < buffLen - !bufp
620                then (* Room in the buffer. *)
621                    (
622                    arrayCopyVec{src=v, si=start, len=vecLen, dst=buf, di= !bufp};
623                    bufp := !bufp + vecLen
624                    )
625                else
626                let
627                    val buffSpace = buffLen - !bufp
628                in
629                    (* Copy as much of the vector as will fit *)
630                    arrayCopyVec{src=v, si=start, len=buffSpace, dst=buf, di= !bufp};
631                    bufp := !bufp+buffSpace;
632                    (* TODO: Flushing the buffer ensures that all the
633                       buffer contents have been written.  We don't
634                       actually need that, what we need is for enough
635                       to have been written that we have space in the
636                       buffer for the rest of the vector. *)
637                    flushOut' f; (* Write it out. *)
638                    (* Copy the rest of the vector. *)
639                    arrayCopyVec{src=v, si=start+buffSpace, len=vecLen-buffSpace, dst=buf, di=0};
640                    bufp := vecLen-buffSpace
641                end (* addVecToBuff *)
642        in
643            if vecLen > buffLen
644            then (* If the vector is too large to put in the buffer we're
645                    going to have to write something out.  To reduce copying
646                    we simply flush the buffer and write the vector directly. *)
647                (flushOut' f; writeVec(f, v, start, vecLen))
648            else (* Try copying to the buffer. *)
649                if !buffType = IO.NO_BUF
650                then (* Write it directly *) writeVec(f, v, start, vecLen)
651                else (* Block or line buffering - add it to the buffer.
652                        Line buffering is treated as block buffering on binary
653                        streams and handled at the higher level for text streams. *)
654                    addVecToBuff()
655        end (* outputVec *)
656    
657        (* This could be defined in terms of outputVector but this is
658           likely to be much more efficient if we are buffering. *)
659        fun output1' _ (OutStream{isTerm=ref true, wrtr=WR{name, ...}, ...}) =
660            raise Io { name = name, function = "output1", cause = ClosedStream }
661         |  output1' c (f as OutStream{buffType, buf, bufp, ...}) =
662            if !buffType = IO.NO_BUF
663            then writeVec(f, Vector.fromList[c], 0, 1)
664            else (* Line or block buffering. *)
665            (
666                Array.update(buf, !bufp, c);
667                bufp := !bufp + 1;
668                if !bufp = Array.length buf then flushOut' f else ()
669            )
670
671        fun getPosOut'(f as OutStream{wrtr=WR{name, getPos=SOME getPos, ...}, ...}) =
672            (
673                flushOut' f;
674                OutPos(f, getPos()) handle exn => raise mapToIo(exn, name, "getPosOut")
675            )
676                
677        |   getPosOut'(OutStream{wrtr=WR{name, ...}, ...}) =
678                raise Io { name = name, function = "getPosOut",
679                           cause = RandomAccessNotSupported }
680    
681        fun setPosOut' p (f as OutStream{wrtr=WR{setPos=SOME setPos, ...}, ...}) =
682            (
683                flushOut' f;
684                setPos p;
685                f
686            )
687        |   setPosOut' _ (OutStream{wrtr=WR{name, ...}, ...}) =
688                raise Io { name = name, function = "setPosOut",
689                           cause = RandomAccessNotSupported }
690    in
691        fun output1(f, c) = protectOut (output1' c) f
692        fun output(f, v) = protectOut (outputVector(v, 0, Vector.length v)) f
693        val flushOut = protectOut flushOut'
694        val closeOut = protectOut closeOut'
695        val getWriter = protectOut getWriter'
696        fun setBufferMode(f, n) = protectOut (setBufferMode' n) f
697  
698        (* Exported function to output part of a vector.  Non-standard. *)
699        fun outputVec(f, slice) =
700            let
701                val (v, i, len) = VectorSlice.base slice
702            in
703                protectOut (outputVector(v, i, len)) f
704            end
705
706        val getPosOut = protectOut getPosOut'
707
708        fun setPosOut(OutPos(f, p)) = protectOut (setPosOut' p) f
709    end
710
711
712    fun filePosOut(OutPos(_, p)) = p
713
714    (* We need to set up a function to flush the streams when we
715       exit.  This has to be set up for every session so we set up
716       an entry function, which is persistent, to do it. *)
717    local
718        fun closeAll () =
719        (* Close all the streams.  closeOut removes the streams
720           from the list so we should end up with outputStreamList
721           being nil. *)
722            List.foldl (fn (s, ()) => closeOut s handle _ => ()) ()
723                (! outputStreamList)
724
725        fun doOnEntry () = OS.Process.atExit closeAll
726    in
727        val () = PolyML.onEntry doOnEntry;
728        val () = doOnEntry() (* Set it up for this session as well. *)
729    end
730
731    local
732        open PolyML
733        fun printWithName(s, name) =
734            PolyML.PrettyString(String.concat[s, "-\"", String.toString name, "\""])
735
736        fun prettyIn depth a (Committed { rest, ...}) =
737                prettyIn depth a rest
738        |   prettyIn _     _ (Uncommitted { state = ref streamState, ...}) =
739            let
740                fun prettyState Truncated =
741                        PolyML.PrettyString("Instream-truncated")
742                |   prettyState (HaveRead{ rest = ref rest, ...}) =
743                        prettyState rest
744                |   prettyState (ToRead(RD{name, ...})) =
745                        printWithName("Instream", name)
746            in
747                prettyState streamState
748            end
749
750        fun prettyOut _ _ (OutStream { wrtr = WR { name, ...}, ...}) =
751            printWithName("Outstream", name)
752    in
753        val () = addPrettyPrinter prettyIn
754        val () = addPrettyPrinter prettyOut
755    end
756end;
757
758(* Define the StreamIO functor in terms of BasicStreamIO to filter
759   out outputVec. *)
760(* This is non-standard.  According to G&R 2004 StreamIO does not take the slice structures as args. *)
761functor StreamIO(
762    structure PrimIO : PRIM_IO
763    structure Vector : MONO_VECTOR
764    structure Array : MONO_ARRAY
765    structure VectorSlice: MONO_VECTOR_SLICE
766    structure ArraySlice: MONO_ARRAY_SLICE
767    sharing type PrimIO.elem = Vector.elem = Array.elem = VectorSlice.elem = ArraySlice.elem
768    sharing type PrimIO.vector = Vector.vector = Array.vector = VectorSlice.vector = ArraySlice.vector
769    sharing type PrimIO.array = Array.array = ArraySlice.array
770    sharing type PrimIO.vector_slice = VectorSlice.slice = ArraySlice.vector_slice
771    sharing type PrimIO.array_slice = ArraySlice.slice
772    val someElem : PrimIO.elem
773    ): STREAM_IO =
774struct
775    structure StreamIO =
776        BasicStreamIO(structure PrimIO = PrimIO
777                      and Vector = Vector
778                      and Array = Array
779                      and VectorSlice = VectorSlice
780                      and ArraySlice = ArraySlice
781                      val someElem = someElem)
782    open StreamIO
783end;
784