1(*
2    Title:      Standard Basis Library: StreamIO functor
3    Copyright   David C.J. Matthews 2000, 2005, 2019
4
5    This library is free software; you can redistribute it and/or
6    modify it under the terms of the GNU Lesser General Public
7    License version 2.1 as published by the Free Software Foundation.
8    
9    This library is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12    Lesser General Public License for more details.
13    
14    You should have received a copy of the GNU Lesser General Public
15    License along with this library; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
17*)
18
19functor BasicStreamIO(
20    structure PrimIO : PRIM_IO
21    structure Vector : MONO_VECTOR
22    structure Array : MONO_ARRAY
23    structure VectorSlice: MONO_VECTOR_SLICE
24    structure ArraySlice: MONO_ARRAY_SLICE
25    sharing type PrimIO.elem = Vector.elem = Array.elem = VectorSlice.elem = ArraySlice.elem
26    sharing type PrimIO.vector = Vector.vector = Array.vector = VectorSlice.vector = ArraySlice.vector
27    sharing type PrimIO.array = Array.array = ArraySlice.array
28    sharing type PrimIO.vector_slice = VectorSlice.slice = ArraySlice.vector_slice
29    sharing type PrimIO.array_slice = ArraySlice.slice
30    val someElem : PrimIO.elem
31    ):
32    sig
33    include STREAM_IO
34    (* Note: This is non-standard but enables us to define
35       the derived BinIO and TextIO structures more efficiently. *)
36    val outputVec: outstream * PrimIO.vector_slice -> unit
37    end =
38struct
39    open IO
40    type vector = Vector.vector
41    type elem = PrimIO.elem
42    datatype reader = datatype PrimIO.reader
43    datatype writer = datatype PrimIO.writer
44    type array = Array.array
45    type pos = PrimIO.pos
46
47    exception Interrupt = RunCall.Interrupt
48
49    (* Called after any exception in the lower level reader or
50       writer to map any exception other than Io into Io. *)
51    fun mapToIo (io as Io _, _, _) = io
52      | mapToIo (Interrupt, _, _) = Interrupt
53      | mapToIo (nonIo, name, caller) =
54            Io { name = name, function = caller, cause = nonIo }
55
56    val emptyVec = Vector.fromList [] (* Represents end-of-stream. *)
57
58    datatype instream =
59        (* The usual state of a stream: We may have to read from the reader
60           before we have any real data or we may have already read. *)
61        Uncommitted of { state: streamState ref,
62                         locker: Thread.Mutex.mutex }
63        (* If we know we have unread input we can return this as the
64           stream.  That allows part of the stream to be read without
65           locking.  This is an optimisation. *)
66      | Committed of
67           { vec: vector, offset: int, rest: instream, startPos: pos option }
68
69    and streamState =
70        Truncated (* The stream has been closed or truncated. *)
71    |   HaveRead of (* A vector has been read from the stream.  If the
72                       vector has size zero this is treated as EOF.
73                       startPos is the position when the vector was
74                       read. *)
75            {vec: vector, rest: streamState ref, startPos: pos option }
76    |   ToRead of reader (* We have not yet closed or truncated the stream. *)
77
78
79    (* Outstream. *)
80    and outstream =
81        OutStream of {
82            wrtr: writer,
83            buffType : IO.buffer_mode ref,
84            buf: array,
85            bufp: int ref,
86            streamState: outstreamState ref,
87            locker: Thread.Mutex.mutex
88            }
89    
90    (* Stream state.
91       OutStreamOpen means that attempts to write should proceed.
92       OutStreamTerminated means the stream has been "terminated" i.e. buffers
93       have been flushed and the writer has been extracted by getWriter.
94       Any attempt to write at this level should fail.
95       OutStreamClosed means that the writer's "close" function has been called.
96       In addition the stream has been terminated.*)
97    and outstreamState = OutStreamOpen | OutStreamTerminated | OutStreamClosed
98
99    datatype out_pos = OutPos of outstream * pos
100
101    (* Create a new stream from the vector and the reader. *)
102    fun mkInstream (r, v: vector): instream =
103    let
104        val readRest =
105           Uncommitted { state = ref (ToRead r), locker = Thread.Mutex.mutex() }
106        (* If the vector is non-empty the first entry is as though the
107           vector had been read otherwise it's just the reader. *)
108    in
109        if Vector.length v = 0
110        then readRest
111        else Committed { vec = v, offset = 0, rest = readRest, startPos = NONE }
112    end
113
114    local
115        fun input' (ref (HaveRead {vec, rest, ...}), locker) =
116            let
117                (* TODO: If we have already read further on we could convert
118                   these entries to Committed. *)
119            in
120                (vec, Uncommitted{ state = rest, locker = locker })
121            end
122
123          | input' (s as ref Truncated, locker) = (* Truncated: return end-of-stream *)
124                   (emptyVec, Uncommitted{ state = s, locker = locker })
125
126          | input' (state as
127                       ref(readMore as ToRead (RD {chunkSize, readVec = SOME readVec, getPos, ...})),
128                       locker) =
129            let
130                (* We've not yet read this.  Try reading from the reader. *)
131                val startPos =
132                   case getPos of SOME g => SOME(g()) | NONE => NONE
133                val data = readVec chunkSize
134                (* Create a reference to the reader which will be updated by
135                   the next read.  The ref is shared between the existing stream
136                   and the new one so reading on either adds to the same chain. *)
137                val nextLink = ref readMore
138                val nextChunk =
139                    HaveRead {vec = data, rest = nextLink, startPos = startPos}
140            in
141                (* Extend the stream by adding this vector to the list of chunks read so far. *)
142                state := nextChunk;
143                (* Return a new stream which continues reading. *)
144                (data, Uncommitted { state = nextLink, locker = locker })
145            end
146
147          | input' (ref(ToRead(RD{name, ...})), _) =
148                (* readVec missing in reader. *)
149                raise Io { name = name, function = "input", cause = BlockingNotSupported }
150
151         fun inputNList' (ref (HaveRead {vec, rest, startPos}), locker, n) =
152            let
153                val vecLength = Vector.length vec
154            in
155                if vecLength = 0 (* End-of-stream: Return next in list. *)
156                then ([vec], Uncommitted{ state = rest, locker = locker })
157                else if n < vecLength
158                then (* We can use what's already been read.  The stream we return allows us
159                        to read the rest without blocking. *)
160                    ([VectorSlice.vector(VectorSlice.slice(vec, 0, SOME n))],
161                     Committed{ vec = vec, offset = n, startPos = startPos,
162                         rest = Uncommitted{ state = rest, locker = locker}  })
163                else if n = vecLength
164                then (* Exactly uses up the buffer.  New stream state is the next entry. *)
165                    ([vec], Uncommitted{ state = rest, locker = locker})
166                else (* Have to get the next item *)
167                let
168                    val (nextVecs, nextStream) = inputNList' (rest, locker, n - vecLength)
169                in
170                    (vec :: nextVecs, nextStream)
171                end
172            end
173
174         | inputNList' (s as ref Truncated, locker, _) =
175                (* Truncated: return end-of-stream *)
176                   ([emptyVec], Uncommitted{ state = s, locker = locker })
177   
178         | inputNList' (f, locker, n) = (* ToRead *)
179             let
180                 val (vec, f') = input' (f, locker)
181             in
182                 if Vector.length vec = 0
183                 then ([vec], f') (* Truncated or end-of-file. *)
184                 else inputNList' (f, locker, n) (* Reread *)
185             end
186
187    in
188        fun input (Uncommitted { state, locker }) =
189                LibraryIOSupport.protect locker input' (state, locker)
190
191        |   input (Committed { vec, offset, rest, ... }) =
192              (* This stream was produced from re-reading a stream that already
193                 had data.  We can return the result without the overhead of locking. *)
194                (VectorSlice.vector(VectorSlice.slice(vec, offset, NONE)), rest)
195
196        fun inputNList (Uncommitted { state, locker }, n) =
197                LibraryIOSupport.protect locker inputNList' (state, locker, n)
198
199        |   inputNList (Committed { vec, offset, rest, startPos }, n) =
200            let
201                val vecLength = Vector.length vec
202            in
203                if vecLength = 0 (* End-of-stream: Return next in list. *)
204                then ([vec], rest)
205                else if n < vecLength - offset
206                then (* We can use what's already been read.  Next entry is a committed
207                        stream that returns the part we haven't yet used. *)
208                    ([VectorSlice.vector(VectorSlice.slice(vec, offset, SOME n))],
209                     Committed{ vec = vec, offset = offset+n, rest = rest, startPos = startPos })
210                else if n = vecLength - offset
211                then (* Exactly uses up the buffer.  New stream state is the next entry. *)
212                    ([VectorSlice.vector(VectorSlice.slice(vec, offset, NONE))], rest)
213                else (* Have to get the next item *)
214                let
215                    val (nextVecs, nextStream) = inputNList (rest, n - (vecLength - offset))
216                in
217                    (VectorSlice.vector(VectorSlice.slice(vec, offset, NONE)) :: nextVecs,
218                     nextStream)
219                end
220            end
221
222        fun inputN (f, n) =
223        if n < 0
224        then raise Size
225        else if n = 0 (* Defined to return the empty vector and f *)
226        then (emptyVec, f)
227        else
228        let
229            val (vecs, f') = inputNList (f, n)
230        in
231            (Vector.concat vecs, f')
232        end
233
234        (* Read the whole of the remaining input until we get an EOF. *)
235        fun inputAll f =
236        let
237            (* Find out the size of the file. *)
238            fun getSize(n, f) =
239            let
240                val (v, f') = input f
241                val vSize = Vector.length v
242            in
243                if vSize = 0
244                then n (* Reached EOF. *)
245                else getSize (n + vSize, f')
246            end
247        in
248            (* Read the whole file. *)
249            inputN(f, getSize(0,f))
250        end
251
252        (* Note a crucial difference between inputN and input1.  Because input1
253           does not return a stream if it detects EOF it cannot advance beyond
254           a temporary EOF in a stream. *)
255        fun input1 (Committed { vec, offset, rest, startPos }) =
256            let
257                val vecSize = Vector.length vec
258            in
259                if vecSize = 0
260                then NONE
261                else if vecSize = offset+1
262                then SOME(Vector.sub(vec, offset), rest)
263                else SOME(Vector.sub(vec, offset),
264                       Committed{ vec = vec, offset = offset+1, rest = rest, startPos = startPos })
265            end
266
267        |   input1 f =
268            let
269                val (s, f') = inputN (f, 1)
270            in
271                if Vector.length s = 0
272                then NONE
273                else SOME(Vector.sub(s, 0), f')
274            end
275
276    end
277
278    local
279        fun doClose (ref (HaveRead {rest, ...})) = doClose rest
280          | doClose (ref Truncated) = ()
281          | doClose (state as ref (ToRead (RD{close, name, ...}))) =
282              (state := Truncated; close() handle exn => raise mapToIo(exn, name, "closeIn"))
283    in
284        fun closeIn (Uncommitted { state, locker }) = LibraryIOSupport.protect locker doClose state
285          | closeIn (Committed { rest, ...}) = closeIn rest
286    end
287
288    local
289        (* Return the reader. *) 
290        fun getReader' (ref (HaveRead {rest, ...})) = getReader' rest
291        |   getReader' (ref Truncated) =
292                raise Io { name = "",  function = "getReader", cause = ClosedStream }
293        |   getReader' (state as ref (ToRead reader)) =
294                (state := Truncated; reader)
295    in
296        fun getReader'' (Uncommitted { state, locker }) =
297                LibraryIOSupport.protect locker getReader' state
298        |   getReader'' (Committed { rest, ... }) = getReader'' rest
299
300        fun getReader f =
301        let
302            val reader = getReader'' f
303            val (allInput, _) = inputAll f
304        in
305            (* Return the reader along with buffered input.  It's not clear
306               what to do if there are EOFs in the stream.  The book says the
307               result is the result of inputAll which takes everything up to the
308               first EOF.  *)
309            (reader, allInput)
310        end
311    end
312    
313    local
314        (* Check that the stream is not terminated and then convert a file position
315           plus a vector offset into a file position.  In particular, if the reader
316           has converted CRNL into NL we don't have a simple relationship between
317           elements and file offsets. *)
318        fun findPosition'(startPos, offset, HaveRead {rest=ref rest, ...}) =
319                findPosition'(startPos, offset, rest)
320        |   findPosition'(_, _, Truncated) =
321                raise Io { name = "",  function = "filePosIn", cause = ClosedStream }
322        |   findPosition'(startPos, offset,
323                ToRead (RD { getPos = SOME getPos, setPos = SOME setPos,
324                             readVec = SOME readVec, ...})) =
325                if offset = 0
326                then startPos (* Easy *)
327                else
328                    (* When we read this vector we recorded the file position of
329                       the beginning only.  To find the file position of the
330                       particular element we actually need to read the portion of
331                       the input up to that element and find out the file position
332                       at that point. *)
333                let
334                    val savep = getPos() (* Save current position. *)
335                    (* Move to the point where we read the vector. *)
336                    val () = setPos startPos;
337                    (* Call readVec until we have read the required number
338                       of elements.  N.B.  Ganser & Reppy has a bug here.
339                       There is no guarantee that readVec n will actually
340                       return n elements so it's unsafe to assume that it
341                       will move the file pointer by n elements. *)
342                    fun doRead n =
343                    let
344                        val read = Vector.length(readVec n)
345                    in
346                        if read = n orelse read = 0 (* Error? *)
347                        then ()
348                        else doRead (n - read)
349                    end
350                    (* Read the offset number of elements. *)
351                    val () = doRead offset;
352                    (* Record the position after actually reading the elements. *)
353                    val position = getPos();
354                in
355                    setPos savep; (* Restore. *)
356                    position
357                end
358        |   findPosition'(_, _, ToRead _) =
359                raise Io { name = "",  function = "filePosIn",
360                           cause = RandomAccessNotSupported }
361
362        fun findPosition(startPos, offset, Committed { rest, ... }) =
363                findPosition(startPos, offset, rest)
364        |   findPosition(startPos, offset, Uncommitted { state = ref state, locker }) =
365                LibraryIOSupport.protect locker findPosition' (startPos, offset, state)
366
367        fun filePosIn' (HaveRead {rest=ref rest, startPos = SOME startPos, ...}) =
368                findPosition'(startPos, 0, rest)
369        |   filePosIn' (HaveRead {startPos = NONE, ...}) =
370                raise Io { name = "",  function = "filePosIn",
371                           cause = RandomAccessNotSupported }
372        |   filePosIn' Truncated =
373                raise Io { name = "",  function = "filePosIn", cause = ClosedStream }
374        |   filePosIn' (ToRead(RD { getPos = SOME getPos, ...})) = getPos()
375        |   filePosIn' (ToRead _) =
376                raise Io { name = "",  function = "filePosIn",
377                           cause = RandomAccessNotSupported }
378
379    in
380        (* Find the first entry to get the position. *)
381        fun filePosIn (Uncommitted { state = ref state, locker }) =
382                LibraryIOSupport.protect locker filePosIn' state
383        |   filePosIn (Committed { offset, rest, startPos = SOME startPos, ... }) =
384                findPosition(startPos, offset, rest)
385        |   filePosIn (Committed { startPos = NONE, ... }) =
386              (* This can occur either because the reader doesn't support getPos or
387                 because the position is within the initial vector passed to
388                 mkInstream. *)
389                raise Io { name = "",  function = "filePosIn",
390                           cause = RandomAccessNotSupported }
391    end
392    
393    local
394        fun doCanInput' (ref (HaveRead {vec, rest, ...}), locker, n, k) =
395            let
396                val vecLength = Vector.length vec
397            in
398                if vecLength = 0
399                then SOME k
400                else if vecLength >= n
401                then SOME (k+n)
402                else doCanInput'(rest, locker, n-vecLength, k+vecLength)
403            end
404        
405        |   doCanInput' (ref Truncated, _, _, k) = SOME k
406
407        |   doCanInput' (state as
408                            ref(readMore as ToRead (RD {chunkSize, readVecNB = SOME readVecNB, getPos, ...})),
409                         locker, n, k) =
410            let
411                val startPos =
412                   case getPos of SOME g => SOME(g()) | NONE => NONE
413            in
414               (* Read a block full.  This will avoid us creating lots of small items
415                  in the list if there is actually plenty of input available. *)
416               case readVecNB chunkSize of
417                    NONE => (* Reading these would block but we may already have some input. *)
418                        if k = 0 then NONE else SOME k
419                |   SOME data =>
420                    let (* We have to record this in the stream. *)
421                        val nextLink = ref readMore
422                        val nextChunk =
423                            HaveRead {vec = data, rest = nextLink, startPos = startPos}
424                    in
425                        state := nextChunk;
426                        (* Check whether this has satisfied the request. *)
427                        doCanInput'(state, locker, n, k)
428                    end
429            end
430
431        |   doCanInput' (ref(ToRead(RD {name, ...})), _, _, _) = 
432                (* readVecNB missing in reader. *)
433                raise Io { name = name, function = "canInput", cause = NonblockingNotSupported }
434
435        fun doCanInput (Uncommitted { state, locker }, n, k) =
436                LibraryIOSupport.protect locker doCanInput' (state, locker, n, k)
437        |   doCanInput (Committed { vec, rest, ... }, n, k) =
438            let
439                val vecLength = Vector.length vec
440            in
441                if vecLength = 0
442                then SOME k (* Reached EOF. *)
443                else if vecLength >= n
444                then SOME (k + n) (* Have already read enough. *)
445                else doCanInput(rest, n-vecLength, k+vecLength)
446            end
447    in
448        fun canInput(f, n) = if n < 0 then raise Size else doCanInput(f, n, 0)
449    end
450
451
452    (* Look for end-of-stream. Could be defined more directly
453       but it probably isn't worth it. *)
454    fun endOfStream f =
455    let
456        val (v, _) = input f
457    in
458        Vector.length v = 0
459    end
460
461
462    (* OUTPUT *)
463    (* In order to be able to flush and close the streams when we exit
464       we need to keep a list of the output streams. *)
465    val ostreamLock = Thread.Mutex.mutex()
466    (* We use a volatile ref so that the list is always reset
467       at the start of a program. *)
468    val outputStreamList: outstream list ref = LibrarySupport.volatileListRef()
469
470    fun protectOut f (outs as OutStream{locker, ...}) = LibraryIOSupport.protect locker f outs
471
472    fun mkOutstream'(wrtr as WR{chunkSize, ...}, buffMode) =
473    let
474        open Thread.Mutex
475        val strm =
476            OutStream{wrtr=wrtr,
477                      buffType=ref buffMode,
478                      buf=Array.array(chunkSize, someElem),
479                      streamState=ref OutStreamOpen,
480                      bufp=ref 0,
481                      locker=Thread.Mutex.mutex()}
482    in
483        (* Add it to the list. *)
484        outputStreamList := strm :: ! outputStreamList;
485        strm
486    end
487    
488    val mkOutstream = LibraryIOSupport.protect ostreamLock mkOutstream'
489
490    fun getBufferMode(OutStream{buffType=ref b, ...}) = b
491
492    local
493        (* Flush anything from the buffer. *)
494        fun flushOut'(OutStream{buf, bufp=bufp as ref endBuf,
495                               wrtr=wrtr as WR{name, ...}, ...}) =
496            if endBuf = 0 then () (* Nothing buffered *)
497            else case wrtr of
498                WR{writeArr=SOME wa, ...} =>
499                let
500                    fun flushBuff n =
501                    let
502                        val written =
503                            wa(ArraySlice.slice(buf, n, SOME(endBuf-n)))
504                            handle exn => raise mapToIo(exn, name, "flushOut")
505                    in
506                        if written+n = endBuf then ()
507                        else flushBuff(written+n)
508                    end
509                in
510                    (* Set the buffer to empty BEFORE writing anything.  If
511                       we get an asynchronous interrupt (ctrl-C) we want to
512                       lose data in preference to duplicating it. *)
513                    bufp := 0;
514                    flushBuff 0
515                end
516            |   _ =>
517                raise Io { name = name, function = "flushOut",
518                           cause = BlockingNotSupported }
519
520        (* Terminate a stream either because it has been closed or
521           because we have extracted the underlying writer. *)
522        fun terminateStream'(f as OutStream{streamState as ref OutStreamOpen, ...}) =
523            let
524                (* outstream is not an equality type but we can get the
525                   desired effect by comparing the streamState references for
526                   equality (N.B. NOT their contents). *)
527                fun removeThis(OutStream{streamState=streamState', ...}) =
528                    streamState' <> streamState
529                open Thread.Mutex
530            in
531                streamState := OutStreamTerminated;
532                lock ostreamLock;
533                outputStreamList := List.filter removeThis (!outputStreamList);
534                unlock ostreamLock;
535                flushOut' f
536            end
537        |   terminateStream' _ = () (* Nothing to do. *)
538      
539        (* Close the stream.  We must call the writer's close function only once
540           unless the flushing fails.  In that case the stream is left open.  *)
541        fun closeOut'(OutStream{streamState=ref OutStreamClosed, ...}) = ()
542
543        |   closeOut'(f as OutStream{wrtr=WR{close, name, ...}, streamState, ...}) =
544            (
545                terminateStream' f;
546                streamState := OutStreamClosed;
547                close() handle exn => raise mapToIo(exn, name, "closeOut") (* Close the underlying writer. *)
548            )
549
550        (* Flush the stream, terminate it and return the underlying writer.
551           According to the documentation this raises an exception if the stream
552           is "closed" rather than "terminated" implying that it is possible to extract
553           the writer more than once.  That's in contrast to getReader which is defined
554           to raise an exception if the stream is closed or truncated. *)
555        fun getWriter'(OutStream{wrtr=WR{name, ...}, streamState=ref OutStreamClosed, ...}) =
556            (* Already closed. *)
557                raise Io { name = name, function = "getWriter", cause = ClosedStream }
558         |  getWriter'(f as OutStream{buffType, wrtr, ...}) =
559            (
560               terminateStream' f;
561               (wrtr, !buffType)
562            )
563
564        (* Set the buffer mode, possibly flushing the buffer as it does. *)
565        fun setBufferMode' newBuff (f as OutStream{buffType, bufp, ...}) =
566        (* Question: What if the stream is terminated? *)
567            (
568            if newBuff = NO_BUF andalso !bufp <> 0
569            then (* Flush pending output. *)
570                (* Switching from block to line buffering does not flush. *)
571                flushOut' f
572            else ();
573            buffType := newBuff
574            )
575
576        (* Internal function: Write a vector directly to the writer. It only
577           returns when the vector has been completely written.
578           "output" should work if the writer only provides writeArr so we
579           may have to use that if writeVec isn't there. *)
580        (* FOR TESTING: Put writeArr first. *)
581        fun writeVec(OutStream{wrtr=WR{writeVec=SOME wv, name, ...}, ...}, v, i, len) =
582        let
583            fun forceOut p =
584            let
585                val written = wv(VectorSlice.slice(v, p+i, SOME(len-p)))
586                    handle exn => raise mapToIo(exn, name, "output")
587            in
588                if written+p = len then ()
589                else forceOut(written+p)
590            end
591        in
592            forceOut 0
593        end
594        |  writeVec(OutStream{wrtr=WR{writeArr=SOME wa, name, ...}, ...}, v, i, len) =
595        let
596            val buffSize = 10
597            val buff = Array.array(buffSize, someElem);
598            fun forceOut p =
599            let
600                val toCopy = Int.min(len-p, buffSize)
601                val () =
602                   ArraySlice.copyVec{src=VectorSlice.slice(v, p+i, SOME toCopy), dst=buff, di=0}
603                val written = wa(ArraySlice.slice(buff, 0, SOME toCopy))
604                    handle exn => raise mapToIo(exn, name, "output")
605            in
606                if written+p = len then ()
607                else forceOut(written+p)
608            end
609        in
610            forceOut 0
611        end       
612        |   writeVec(OutStream{wrtr=WR{name, ...}, ...}, _, _, _) =
613                raise Io { name = name, function = "output",
614                           cause = BlockingNotSupported }
615    
616        (* Internal function. Write a vector to the stream using the start and
617           length provided. *)
618        fun outputVector (v, start, vecLen) (f as OutStream{streamState=ref OutStreamOpen, buffType, buf, bufp, ...})  =
619        let
620            val buffLen = Array.length buf
621
622            fun arrayCopyVec{src: Vector.vector, si: int, len: int, dst: Array.array, di: int} =
623                ArraySlice.copyVec{src=VectorSlice.slice(src, si, SOME len), dst=dst, di=di};
624   
625            fun addVecToBuff () =
626                if vecLen < buffLen - !bufp
627                then (* Room in the buffer. *)
628                    (
629                    arrayCopyVec{src=v, si=start, len=vecLen, dst=buf, di= !bufp};
630                    bufp := !bufp + vecLen
631                    )
632                else
633                let
634                    val buffSpace = buffLen - !bufp
635                in
636                    (* Copy as much of the vector as will fit *)
637                    arrayCopyVec{src=v, si=start, len=buffSpace, dst=buf, di= !bufp};
638                    bufp := !bufp+buffSpace;
639                    (* TODO: Flushing the buffer ensures that all the
640                       buffer contents have been written.  We don't
641                       actually need that, what we need is for enough
642                       to have been written that we have space in the
643                       buffer for the rest of the vector. *)
644                    flushOut' f; (* Write it out. *)
645                    (* Copy the rest of the vector. *)
646                    arrayCopyVec{src=v, si=start+buffSpace, len=vecLen-buffSpace, dst=buf, di=0};
647                    bufp := vecLen-buffSpace
648                end (* addVecToBuff *)
649        in
650            if vecLen > buffLen
651            then (* If the vector is too large to put in the buffer we're
652                    going to have to write something out.  To reduce copying
653                    we simply flush the buffer and write the vector directly. *)
654                (flushOut' f; writeVec(f, v, start, vecLen))
655            else (* Try copying to the buffer. *)
656                if !buffType = IO.NO_BUF
657                then (* Write it directly *) writeVec(f, v, start, vecLen)
658                else (* Block or line buffering - add it to the buffer.
659                        Line buffering is treated as block buffering on binary
660                        streams and handled at the higher level for text streams. *)
661                    addVecToBuff()
662        end
663
664            (* State was not open *)
665        |   outputVector _ (OutStream{wrtr=WR{name, ...}, ...}) =
666                raise Io { name = name, function = "output", cause = ClosedStream }
667    
668        (* This could be defined in terms of outputVector but this is
669           likely to be much more efficient if we are buffering. *)
670        fun output1' c (f as OutStream{streamState=ref OutStreamOpen, buffType, buf, bufp, ...}) =
671            if !buffType = IO.NO_BUF
672            then writeVec(f, Vector.fromList[c], 0, 1)
673            else (* Line or block buffering. *)
674            (
675                Array.update(buf, !bufp, c);
676                bufp := !bufp + 1;
677                if !bufp = Array.length buf then flushOut' f else ()
678            )
679
680            (* State was not open *)
681        |   output1' _ (OutStream{wrtr=WR{name, ...}, ...}) =
682                raise Io { name = name, function = "output1", cause = ClosedStream }
683
684        fun getPosOut'(f as OutStream{wrtr=WR{name, getPos=SOME getPos, ...}, ...}) =
685            (
686                flushOut' f;
687                OutPos(f, getPos()) handle exn => raise mapToIo(exn, name, "getPosOut")
688            )
689                
690        |   getPosOut'(OutStream{wrtr=WR{name, ...}, ...}) =
691                raise Io { name = name, function = "getPosOut",
692                           cause = RandomAccessNotSupported }
693    
694        fun setPosOut' p (f as OutStream{wrtr=WR{setPos=SOME setPos, ...}, ...}) =
695            (
696                flushOut' f;
697                setPos p;
698                f
699            )
700        |   setPosOut' _ (OutStream{wrtr=WR{name, ...}, ...}) =
701                raise Io { name = name, function = "setPosOut",
702                           cause = RandomAccessNotSupported }
703    in
704        fun output1(f, c) = protectOut (output1' c) f
705        fun output(f, v) = protectOut (outputVector(v, 0, Vector.length v)) f
706        val flushOut = protectOut flushOut'
707        val closeOut = protectOut closeOut'
708        val getWriter = protectOut getWriter'
709        fun setBufferMode(f, n) = protectOut (setBufferMode' n) f
710  
711        (* Exported function to output part of a vector.  Non-standard. *)
712        fun outputVec(f, slice) =
713            let
714                val (v, i, len) = VectorSlice.base slice
715            in
716                protectOut (outputVector(v, i, len)) f
717            end
718
719        val getPosOut = protectOut getPosOut'
720
721        fun setPosOut(OutPos(f, p)) = protectOut (setPosOut' p) f
722    end
723
724
725    fun filePosOut(OutPos(_, p)) = p
726
727    (* We need to set up a function to flush the streams when we
728       exit.  This has to be set up for every session so we set up
729       an entry function, which is persistent, to do it. *)
730    local
731        fun closeAll () =
732        (* Close all the streams.  closeOut removes the streams
733           from the list so we should end up with outputStreamList
734           being nil. *)
735            List.foldl (fn (s, ()) => closeOut s handle _ => ()) ()
736                (! outputStreamList)
737
738        fun doOnEntry () = OS.Process.atExit closeAll
739    in
740        val () = LibrarySupport.addOnEntry doOnEntry;
741        val () = doOnEntry() (* Set it up for this session as well. *)
742    end
743
744    local
745        open PolyML
746        fun printWithName(s, name) =
747            PolyML.PrettyString(String.concat[s, "-\"", String.toString name, "\""])
748
749        fun prettyIn depth a (Committed { rest, ...}) =
750                prettyIn depth a rest
751        |   prettyIn _     _ (Uncommitted { state = ref streamState, ...}) =
752            let
753                fun prettyState Truncated =
754                        PolyML.PrettyString("Instream-truncated")
755                |   prettyState (HaveRead{ rest = ref rest, ...}) =
756                        prettyState rest
757                |   prettyState (ToRead(RD{name, ...})) =
758                        printWithName("Instream", name)
759            in
760                prettyState streamState
761            end
762
763        fun prettyOut _ _ (OutStream { wrtr = WR { name, ...}, ...}) =
764            printWithName("Outstream", name)
765    in
766        val () = addPrettyPrinter prettyIn
767        val () = addPrettyPrinter prettyOut
768    end
769end;
770
771(* Define the StreamIO functor in terms of BasicStreamIO to filter
772   out outputVec. *)
773(* This is non-standard.  According to G&R 2004 StreamIO does not take the slice structures as args. *)
774functor StreamIO(
775    structure PrimIO : PRIM_IO
776    structure Vector : MONO_VECTOR
777    structure Array : MONO_ARRAY
778    structure VectorSlice: MONO_VECTOR_SLICE
779    structure ArraySlice: MONO_ARRAY_SLICE
780    sharing type PrimIO.elem = Vector.elem = Array.elem = VectorSlice.elem = ArraySlice.elem
781    sharing type PrimIO.vector = Vector.vector = Array.vector = VectorSlice.vector = ArraySlice.vector
782    sharing type PrimIO.array = Array.array = ArraySlice.array
783    sharing type PrimIO.vector_slice = VectorSlice.slice = ArraySlice.vector_slice
784    sharing type PrimIO.array_slice = ArraySlice.slice
785    val someElem : PrimIO.elem
786    ): STREAM_IO =
787struct
788    structure StreamIO =
789        BasicStreamIO(structure PrimIO = PrimIO
790                      and Vector = Vector
791                      and Array = Array
792                      and VectorSlice = VectorSlice
793                      and ArraySlice = ArraySlice
794                      val someElem = someElem)
795    open StreamIO
796end;
797