1(* 2 Title: Standard Basis Library: StreamIO functor 3 Copyright David C.J. Matthews 2000, 2005, 2019 4 5 This library is free software; you can redistribute it and/or 6 modify it under the terms of the GNU Lesser General Public 7 License version 2.1 as published by the Free Software Foundation. 8 9 This library is distributed in the hope that it will be useful, 10 but WITHOUT ANY WARRANTY; without even the implied warranty of 11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 Lesser General Public License for more details. 13 14 You should have received a copy of the GNU Lesser General Public 15 License along with this library; if not, write to the Free Software 16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA 17*) 18 19functor BasicStreamIO( 20 structure PrimIO : PRIM_IO 21 structure Vector : MONO_VECTOR 22 structure Array : MONO_ARRAY 23 structure VectorSlice: MONO_VECTOR_SLICE 24 structure ArraySlice: MONO_ARRAY_SLICE 25 sharing type PrimIO.elem = Vector.elem = Array.elem = VectorSlice.elem = ArraySlice.elem 26 sharing type PrimIO.vector = Vector.vector = Array.vector = VectorSlice.vector = ArraySlice.vector 27 sharing type PrimIO.array = Array.array = ArraySlice.array 28 sharing type PrimIO.vector_slice = VectorSlice.slice = ArraySlice.vector_slice 29 sharing type PrimIO.array_slice = ArraySlice.slice 30 val someElem : PrimIO.elem 31 ): 32 sig 33 include STREAM_IO 34 (* Note: This is non-standard but enables us to define 35 the derived BinIO and TextIO structures more efficiently. *) 36 val outputVec: outstream * PrimIO.vector_slice -> unit 37 end = 38struct 39 open IO 40 type vector = Vector.vector 41 type elem = PrimIO.elem 42 datatype reader = datatype PrimIO.reader 43 datatype writer = datatype PrimIO.writer 44 type array = Array.array 45 type pos = PrimIO.pos 46 47 exception Interrupt = RunCall.Interrupt 48 49 (* Called after any exception in the lower level reader or 50 writer to map any exception other than Io into Io. *) 51 fun mapToIo (io as Io _, _, _) = io 52 | mapToIo (Interrupt, _, _) = Interrupt 53 | mapToIo (nonIo, name, caller) = 54 Io { name = name, function = caller, cause = nonIo } 55 56 val emptyVec = Vector.fromList [] (* Represents end-of-stream. *) 57 58 datatype instream = 59 (* The usual state of a stream: We may have to read from the reader 60 before we have any real data or we may have already read. *) 61 Uncommitted of { state: streamState ref, 62 locker: Thread.Mutex.mutex } 63 (* If we know we have unread input we can return this as the 64 stream. That allows part of the stream to be read without 65 locking. This is an optimisation. *) 66 | Committed of 67 { vec: vector, offset: int, rest: instream, startPos: pos option } 68 69 and streamState = 70 Truncated (* The stream has been closed or truncated. *) 71 | HaveRead of (* A vector has been read from the stream. If the 72 vector has size zero this is treated as EOF. 73 startPos is the position when the vector was 74 read. *) 75 {vec: vector, rest: streamState ref, startPos: pos option } 76 | ToRead of reader (* We have not yet closed or truncated the stream. *) 77 78 79 (* Outstream. *) 80 and outstream = 81 OutStream of { 82 wrtr: writer, 83 buffType : IO.buffer_mode ref, 84 buf: array, 85 bufp: int ref, 86 streamState: outstreamState ref, 87 locker: Thread.Mutex.mutex 88 } 89 90 (* Stream state. 91 OutStreamOpen means that attempts to write should proceed. 92 OutStreamTerminated means the stream has been "terminated" i.e. buffers 93 have been flushed and the writer has been extracted by getWriter. 94 Any attempt to write at this level should fail. 95 OutStreamClosed means that the writer's "close" function has been called. 96 In addition the stream has been terminated.*) 97 and outstreamState = OutStreamOpen | OutStreamTerminated | OutStreamClosed 98 99 datatype out_pos = OutPos of outstream * pos 100 101 (* Create a new stream from the vector and the reader. *) 102 fun mkInstream (r, v: vector): instream = 103 let 104 val readRest = 105 Uncommitted { state = ref (ToRead r), locker = Thread.Mutex.mutex() } 106 (* If the vector is non-empty the first entry is as though the 107 vector had been read otherwise it's just the reader. *) 108 in 109 if Vector.length v = 0 110 then readRest 111 else Committed { vec = v, offset = 0, rest = readRest, startPos = NONE } 112 end 113 114 local 115 fun input' (ref (HaveRead {vec, rest, ...}), locker) = 116 let 117 (* TODO: If we have already read further on we could convert 118 these entries to Committed. *) 119 in 120 (vec, Uncommitted{ state = rest, locker = locker }) 121 end 122 123 | input' (s as ref Truncated, locker) = (* Truncated: return end-of-stream *) 124 (emptyVec, Uncommitted{ state = s, locker = locker }) 125 126 | input' (state as 127 ref(readMore as ToRead (RD {chunkSize, readVec = SOME readVec, getPos, ...})), 128 locker) = 129 let 130 (* We've not yet read this. Try reading from the reader. *) 131 val startPos = 132 case getPos of SOME g => SOME(g()) | NONE => NONE 133 val data = readVec chunkSize 134 (* Create a reference to the reader which will be updated by 135 the next read. The ref is shared between the existing stream 136 and the new one so reading on either adds to the same chain. *) 137 val nextLink = ref readMore 138 val nextChunk = 139 HaveRead {vec = data, rest = nextLink, startPos = startPos} 140 in 141 (* Extend the stream by adding this vector to the list of chunks read so far. *) 142 state := nextChunk; 143 (* Return a new stream which continues reading. *) 144 (data, Uncommitted { state = nextLink, locker = locker }) 145 end 146 147 | input' (ref(ToRead(RD{name, ...})), _) = 148 (* readVec missing in reader. *) 149 raise Io { name = name, function = "input", cause = BlockingNotSupported } 150 151 fun inputNList' (ref (HaveRead {vec, rest, startPos}), locker, n) = 152 let 153 val vecLength = Vector.length vec 154 in 155 if vecLength = 0 (* End-of-stream: Return next in list. *) 156 then ([vec], Uncommitted{ state = rest, locker = locker }) 157 else if n < vecLength 158 then (* We can use what's already been read. The stream we return allows us 159 to read the rest without blocking. *) 160 ([VectorSlice.vector(VectorSlice.slice(vec, 0, SOME n))], 161 Committed{ vec = vec, offset = n, startPos = startPos, 162 rest = Uncommitted{ state = rest, locker = locker} }) 163 else if n = vecLength 164 then (* Exactly uses up the buffer. New stream state is the next entry. *) 165 ([vec], Uncommitted{ state = rest, locker = locker}) 166 else (* Have to get the next item *) 167 let 168 val (nextVecs, nextStream) = inputNList' (rest, locker, n - vecLength) 169 in 170 (vec :: nextVecs, nextStream) 171 end 172 end 173 174 | inputNList' (s as ref Truncated, locker, _) = 175 (* Truncated: return end-of-stream *) 176 ([emptyVec], Uncommitted{ state = s, locker = locker }) 177 178 | inputNList' (f, locker, n) = (* ToRead *) 179 let 180 val (vec, f') = input' (f, locker) 181 in 182 if Vector.length vec = 0 183 then ([vec], f') (* Truncated or end-of-file. *) 184 else inputNList' (f, locker, n) (* Reread *) 185 end 186 187 in 188 fun input (Uncommitted { state, locker }) = 189 LibraryIOSupport.protect locker input' (state, locker) 190 191 | input (Committed { vec, offset, rest, ... }) = 192 (* This stream was produced from re-reading a stream that already 193 had data. We can return the result without the overhead of locking. *) 194 (VectorSlice.vector(VectorSlice.slice(vec, offset, NONE)), rest) 195 196 fun inputNList (Uncommitted { state, locker }, n) = 197 LibraryIOSupport.protect locker inputNList' (state, locker, n) 198 199 | inputNList (Committed { vec, offset, rest, startPos }, n) = 200 let 201 val vecLength = Vector.length vec 202 in 203 if vecLength = 0 (* End-of-stream: Return next in list. *) 204 then ([vec], rest) 205 else if n < vecLength - offset 206 then (* We can use what's already been read. Next entry is a committed 207 stream that returns the part we haven't yet used. *) 208 ([VectorSlice.vector(VectorSlice.slice(vec, offset, SOME n))], 209 Committed{ vec = vec, offset = offset+n, rest = rest, startPos = startPos }) 210 else if n = vecLength - offset 211 then (* Exactly uses up the buffer. New stream state is the next entry. *) 212 ([VectorSlice.vector(VectorSlice.slice(vec, offset, NONE))], rest) 213 else (* Have to get the next item *) 214 let 215 val (nextVecs, nextStream) = inputNList (rest, n - (vecLength - offset)) 216 in 217 (VectorSlice.vector(VectorSlice.slice(vec, offset, NONE)) :: nextVecs, 218 nextStream) 219 end 220 end 221 222 fun inputN (f, n) = 223 if n < 0 224 then raise Size 225 else if n = 0 (* Defined to return the empty vector and f *) 226 then (emptyVec, f) 227 else 228 let 229 val (vecs, f') = inputNList (f, n) 230 in 231 (Vector.concat vecs, f') 232 end 233 234 (* Read the whole of the remaining input until we get an EOF. *) 235 fun inputAll f = 236 let 237 (* Find out the size of the file. *) 238 fun getSize(n, f) = 239 let 240 val (v, f') = input f 241 val vSize = Vector.length v 242 in 243 if vSize = 0 244 then n (* Reached EOF. *) 245 else getSize (n + vSize, f') 246 end 247 in 248 (* Read the whole file. *) 249 inputN(f, getSize(0,f)) 250 end 251 252 (* Note a crucial difference between inputN and input1. Because input1 253 does not return a stream if it detects EOF it cannot advance beyond 254 a temporary EOF in a stream. *) 255 fun input1 (Committed { vec, offset, rest, startPos }) = 256 let 257 val vecSize = Vector.length vec 258 in 259 if vecSize = 0 260 then NONE 261 else if vecSize = offset+1 262 then SOME(Vector.sub(vec, offset), rest) 263 else SOME(Vector.sub(vec, offset), 264 Committed{ vec = vec, offset = offset+1, rest = rest, startPos = startPos }) 265 end 266 267 | input1 f = 268 let 269 val (s, f') = inputN (f, 1) 270 in 271 if Vector.length s = 0 272 then NONE 273 else SOME(Vector.sub(s, 0), f') 274 end 275 276 end 277 278 local 279 fun doClose (ref (HaveRead {rest, ...})) = doClose rest 280 | doClose (ref Truncated) = () 281 | doClose (state as ref (ToRead (RD{close, name, ...}))) = 282 (state := Truncated; close() handle exn => raise mapToIo(exn, name, "closeIn")) 283 in 284 fun closeIn (Uncommitted { state, locker }) = LibraryIOSupport.protect locker doClose state 285 | closeIn (Committed { rest, ...}) = closeIn rest 286 end 287 288 local 289 (* Return the reader. *) 290 fun getReader' (ref (HaveRead {rest, ...})) = getReader' rest 291 | getReader' (ref Truncated) = 292 raise Io { name = "", function = "getReader", cause = ClosedStream } 293 | getReader' (state as ref (ToRead reader)) = 294 (state := Truncated; reader) 295 in 296 fun getReader'' (Uncommitted { state, locker }) = 297 LibraryIOSupport.protect locker getReader' state 298 | getReader'' (Committed { rest, ... }) = getReader'' rest 299 300 fun getReader f = 301 let 302 val reader = getReader'' f 303 val (allInput, _) = inputAll f 304 in 305 (* Return the reader along with buffered input. It's not clear 306 what to do if there are EOFs in the stream. The book says the 307 result is the result of inputAll which takes everything up to the 308 first EOF. *) 309 (reader, allInput) 310 end 311 end 312 313 local 314 (* Check that the stream is not terminated and then convert a file position 315 plus a vector offset into a file position. In particular, if the reader 316 has converted CRNL into NL we don't have a simple relationship between 317 elements and file offsets. *) 318 fun findPosition'(startPos, offset, HaveRead {rest=ref rest, ...}) = 319 findPosition'(startPos, offset, rest) 320 | findPosition'(_, _, Truncated) = 321 raise Io { name = "", function = "filePosIn", cause = ClosedStream } 322 | findPosition'(startPos, offset, 323 ToRead (RD { getPos = SOME getPos, setPos = SOME setPos, 324 readVec = SOME readVec, ...})) = 325 if offset = 0 326 then startPos (* Easy *) 327 else 328 (* When we read this vector we recorded the file position of 329 the beginning only. To find the file position of the 330 particular element we actually need to read the portion of 331 the input up to that element and find out the file position 332 at that point. *) 333 let 334 val savep = getPos() (* Save current position. *) 335 (* Move to the point where we read the vector. *) 336 val () = setPos startPos; 337 (* Call readVec until we have read the required number 338 of elements. N.B. Ganser & Reppy has a bug here. 339 There is no guarantee that readVec n will actually 340 return n elements so it's unsafe to assume that it 341 will move the file pointer by n elements. *) 342 fun doRead n = 343 let 344 val read = Vector.length(readVec n) 345 in 346 if read = n orelse read = 0 (* Error? *) 347 then () 348 else doRead (n - read) 349 end 350 (* Read the offset number of elements. *) 351 val () = doRead offset; 352 (* Record the position after actually reading the elements. *) 353 val position = getPos(); 354 in 355 setPos savep; (* Restore. *) 356 position 357 end 358 | findPosition'(_, _, ToRead _) = 359 raise Io { name = "", function = "filePosIn", 360 cause = RandomAccessNotSupported } 361 362 fun findPosition(startPos, offset, Committed { rest, ... }) = 363 findPosition(startPos, offset, rest) 364 | findPosition(startPos, offset, Uncommitted { state = ref state, locker }) = 365 LibraryIOSupport.protect locker findPosition' (startPos, offset, state) 366 367 fun filePosIn' (HaveRead {rest=ref rest, startPos = SOME startPos, ...}) = 368 findPosition'(startPos, 0, rest) 369 | filePosIn' (HaveRead {startPos = NONE, ...}) = 370 raise Io { name = "", function = "filePosIn", 371 cause = RandomAccessNotSupported } 372 | filePosIn' Truncated = 373 raise Io { name = "", function = "filePosIn", cause = ClosedStream } 374 | filePosIn' (ToRead(RD { getPos = SOME getPos, ...})) = getPos() 375 | filePosIn' (ToRead _) = 376 raise Io { name = "", function = "filePosIn", 377 cause = RandomAccessNotSupported } 378 379 in 380 (* Find the first entry to get the position. *) 381 fun filePosIn (Uncommitted { state = ref state, locker }) = 382 LibraryIOSupport.protect locker filePosIn' state 383 | filePosIn (Committed { offset, rest, startPos = SOME startPos, ... }) = 384 findPosition(startPos, offset, rest) 385 | filePosIn (Committed { startPos = NONE, ... }) = 386 (* This can occur either because the reader doesn't support getPos or 387 because the position is within the initial vector passed to 388 mkInstream. *) 389 raise Io { name = "", function = "filePosIn", 390 cause = RandomAccessNotSupported } 391 end 392 393 local 394 fun doCanInput' (ref (HaveRead {vec, rest, ...}), locker, n, k) = 395 let 396 val vecLength = Vector.length vec 397 in 398 if vecLength = 0 399 then SOME k 400 else if vecLength >= n 401 then SOME (k+n) 402 else doCanInput'(rest, locker, n-vecLength, k+vecLength) 403 end 404 405 | doCanInput' (ref Truncated, _, _, k) = SOME k 406 407 | doCanInput' (state as 408 ref(readMore as ToRead (RD {chunkSize, readVecNB = SOME readVecNB, getPos, ...})), 409 locker, n, k) = 410 let 411 val startPos = 412 case getPos of SOME g => SOME(g()) | NONE => NONE 413 in 414 (* Read a block full. This will avoid us creating lots of small items 415 in the list if there is actually plenty of input available. *) 416 case readVecNB chunkSize of 417 NONE => (* Reading these would block but we may already have some input. *) 418 if k = 0 then NONE else SOME k 419 | SOME data => 420 let (* We have to record this in the stream. *) 421 val nextLink = ref readMore 422 val nextChunk = 423 HaveRead {vec = data, rest = nextLink, startPos = startPos} 424 in 425 state := nextChunk; 426 (* Check whether this has satisfied the request. *) 427 doCanInput'(state, locker, n, k) 428 end 429 end 430 431 | doCanInput' (ref(ToRead(RD {name, ...})), _, _, _) = 432 (* readVecNB missing in reader. *) 433 raise Io { name = name, function = "canInput", cause = NonblockingNotSupported } 434 435 fun doCanInput (Uncommitted { state, locker }, n, k) = 436 LibraryIOSupport.protect locker doCanInput' (state, locker, n, k) 437 | doCanInput (Committed { vec, rest, ... }, n, k) = 438 let 439 val vecLength = Vector.length vec 440 in 441 if vecLength = 0 442 then SOME k (* Reached EOF. *) 443 else if vecLength >= n 444 then SOME (k + n) (* Have already read enough. *) 445 else doCanInput(rest, n-vecLength, k+vecLength) 446 end 447 in 448 fun canInput(f, n) = if n < 0 then raise Size else doCanInput(f, n, 0) 449 end 450 451 452 (* Look for end-of-stream. Could be defined more directly 453 but it probably isn't worth it. *) 454 fun endOfStream f = 455 let 456 val (v, _) = input f 457 in 458 Vector.length v = 0 459 end 460 461 462 (* OUTPUT *) 463 (* In order to be able to flush and close the streams when we exit 464 we need to keep a list of the output streams. *) 465 val ostreamLock = Thread.Mutex.mutex() 466 (* We use a volatile ref so that the list is always reset 467 at the start of a program. *) 468 val outputStreamList: outstream list ref = LibrarySupport.volatileListRef() 469 470 fun protectOut f (outs as OutStream{locker, ...}) = LibraryIOSupport.protect locker f outs 471 472 fun mkOutstream'(wrtr as WR{chunkSize, ...}, buffMode) = 473 let 474 open Thread.Mutex 475 val strm = 476 OutStream{wrtr=wrtr, 477 buffType=ref buffMode, 478 buf=Array.array(chunkSize, someElem), 479 streamState=ref OutStreamOpen, 480 bufp=ref 0, 481 locker=Thread.Mutex.mutex()} 482 in 483 (* Add it to the list. *) 484 outputStreamList := strm :: ! outputStreamList; 485 strm 486 end 487 488 val mkOutstream = LibraryIOSupport.protect ostreamLock mkOutstream' 489 490 fun getBufferMode(OutStream{buffType=ref b, ...}) = b 491 492 local 493 (* Flush anything from the buffer. *) 494 fun flushOut'(OutStream{buf, bufp=bufp as ref endBuf, 495 wrtr=wrtr as WR{name, ...}, ...}) = 496 if endBuf = 0 then () (* Nothing buffered *) 497 else case wrtr of 498 WR{writeArr=SOME wa, ...} => 499 let 500 fun flushBuff n = 501 let 502 val written = 503 wa(ArraySlice.slice(buf, n, SOME(endBuf-n))) 504 handle exn => raise mapToIo(exn, name, "flushOut") 505 in 506 if written+n = endBuf then () 507 else flushBuff(written+n) 508 end 509 in 510 (* Set the buffer to empty BEFORE writing anything. If 511 we get an asynchronous interrupt (ctrl-C) we want to 512 lose data in preference to duplicating it. *) 513 bufp := 0; 514 flushBuff 0 515 end 516 | _ => 517 raise Io { name = name, function = "flushOut", 518 cause = BlockingNotSupported } 519 520 (* Terminate a stream either because it has been closed or 521 because we have extracted the underlying writer. *) 522 fun terminateStream'(f as OutStream{streamState as ref OutStreamOpen, ...}) = 523 let 524 (* outstream is not an equality type but we can get the 525 desired effect by comparing the streamState references for 526 equality (N.B. NOT their contents). *) 527 fun removeThis(OutStream{streamState=streamState', ...}) = 528 streamState' <> streamState 529 open Thread.Mutex 530 in 531 streamState := OutStreamTerminated; 532 lock ostreamLock; 533 outputStreamList := List.filter removeThis (!outputStreamList); 534 unlock ostreamLock; 535 flushOut' f 536 end 537 | terminateStream' _ = () (* Nothing to do. *) 538 539 (* Close the stream. We must call the writer's close function only once 540 unless the flushing fails. In that case the stream is left open. *) 541 fun closeOut'(OutStream{streamState=ref OutStreamClosed, ...}) = () 542 543 | closeOut'(f as OutStream{wrtr=WR{close, name, ...}, streamState, ...}) = 544 ( 545 terminateStream' f; 546 streamState := OutStreamClosed; 547 close() handle exn => raise mapToIo(exn, name, "closeOut") (* Close the underlying writer. *) 548 ) 549 550 (* Flush the stream, terminate it and return the underlying writer. 551 According to the documentation this raises an exception if the stream 552 is "closed" rather than "terminated" implying that it is possible to extract 553 the writer more than once. That's in contrast to getReader which is defined 554 to raise an exception if the stream is closed or truncated. *) 555 fun getWriter'(OutStream{wrtr=WR{name, ...}, streamState=ref OutStreamClosed, ...}) = 556 (* Already closed. *) 557 raise Io { name = name, function = "getWriter", cause = ClosedStream } 558 | getWriter'(f as OutStream{buffType, wrtr, ...}) = 559 ( 560 terminateStream' f; 561 (wrtr, !buffType) 562 ) 563 564 (* Set the buffer mode, possibly flushing the buffer as it does. *) 565 fun setBufferMode' newBuff (f as OutStream{buffType, bufp, ...}) = 566 (* Question: What if the stream is terminated? *) 567 ( 568 if newBuff = NO_BUF andalso !bufp <> 0 569 then (* Flush pending output. *) 570 (* Switching from block to line buffering does not flush. *) 571 flushOut' f 572 else (); 573 buffType := newBuff 574 ) 575 576 (* Internal function: Write a vector directly to the writer. It only 577 returns when the vector has been completely written. 578 "output" should work if the writer only provides writeArr so we 579 may have to use that if writeVec isn't there. *) 580 (* FOR TESTING: Put writeArr first. *) 581 fun writeVec(OutStream{wrtr=WR{writeVec=SOME wv, name, ...}, ...}, v, i, len) = 582 let 583 fun forceOut p = 584 let 585 val written = wv(VectorSlice.slice(v, p+i, SOME(len-p))) 586 handle exn => raise mapToIo(exn, name, "output") 587 in 588 if written+p = len then () 589 else forceOut(written+p) 590 end 591 in 592 forceOut 0 593 end 594 | writeVec(OutStream{wrtr=WR{writeArr=SOME wa, name, ...}, ...}, v, i, len) = 595 let 596 val buffSize = 10 597 val buff = Array.array(buffSize, someElem); 598 fun forceOut p = 599 let 600 val toCopy = Int.min(len-p, buffSize) 601 val () = 602 ArraySlice.copyVec{src=VectorSlice.slice(v, p+i, SOME toCopy), dst=buff, di=0} 603 val written = wa(ArraySlice.slice(buff, 0, SOME toCopy)) 604 handle exn => raise mapToIo(exn, name, "output") 605 in 606 if written+p = len then () 607 else forceOut(written+p) 608 end 609 in 610 forceOut 0 611 end 612 | writeVec(OutStream{wrtr=WR{name, ...}, ...}, _, _, _) = 613 raise Io { name = name, function = "output", 614 cause = BlockingNotSupported } 615 616 (* Internal function. Write a vector to the stream using the start and 617 length provided. *) 618 fun outputVector (v, start, vecLen) (f as OutStream{streamState=ref OutStreamOpen, buffType, buf, bufp, ...}) = 619 let 620 val buffLen = Array.length buf 621 622 fun arrayCopyVec{src: Vector.vector, si: int, len: int, dst: Array.array, di: int} = 623 ArraySlice.copyVec{src=VectorSlice.slice(src, si, SOME len), dst=dst, di=di}; 624 625 fun addVecToBuff () = 626 if vecLen < buffLen - !bufp 627 then (* Room in the buffer. *) 628 ( 629 arrayCopyVec{src=v, si=start, len=vecLen, dst=buf, di= !bufp}; 630 bufp := !bufp + vecLen 631 ) 632 else 633 let 634 val buffSpace = buffLen - !bufp 635 in 636 (* Copy as much of the vector as will fit *) 637 arrayCopyVec{src=v, si=start, len=buffSpace, dst=buf, di= !bufp}; 638 bufp := !bufp+buffSpace; 639 (* TODO: Flushing the buffer ensures that all the 640 buffer contents have been written. We don't 641 actually need that, what we need is for enough 642 to have been written that we have space in the 643 buffer for the rest of the vector. *) 644 flushOut' f; (* Write it out. *) 645 (* Copy the rest of the vector. *) 646 arrayCopyVec{src=v, si=start+buffSpace, len=vecLen-buffSpace, dst=buf, di=0}; 647 bufp := vecLen-buffSpace 648 end (* addVecToBuff *) 649 in 650 if vecLen > buffLen 651 then (* If the vector is too large to put in the buffer we're 652 going to have to write something out. To reduce copying 653 we simply flush the buffer and write the vector directly. *) 654 (flushOut' f; writeVec(f, v, start, vecLen)) 655 else (* Try copying to the buffer. *) 656 if !buffType = IO.NO_BUF 657 then (* Write it directly *) writeVec(f, v, start, vecLen) 658 else (* Block or line buffering - add it to the buffer. 659 Line buffering is treated as block buffering on binary 660 streams and handled at the higher level for text streams. *) 661 addVecToBuff() 662 end 663 664 (* State was not open *) 665 | outputVector _ (OutStream{wrtr=WR{name, ...}, ...}) = 666 raise Io { name = name, function = "output", cause = ClosedStream } 667 668 (* This could be defined in terms of outputVector but this is 669 likely to be much more efficient if we are buffering. *) 670 fun output1' c (f as OutStream{streamState=ref OutStreamOpen, buffType, buf, bufp, ...}) = 671 if !buffType = IO.NO_BUF 672 then writeVec(f, Vector.fromList[c], 0, 1) 673 else (* Line or block buffering. *) 674 ( 675 Array.update(buf, !bufp, c); 676 bufp := !bufp + 1; 677 if !bufp = Array.length buf then flushOut' f else () 678 ) 679 680 (* State was not open *) 681 | output1' _ (OutStream{wrtr=WR{name, ...}, ...}) = 682 raise Io { name = name, function = "output1", cause = ClosedStream } 683 684 fun getPosOut'(f as OutStream{wrtr=WR{name, getPos=SOME getPos, ...}, ...}) = 685 ( 686 flushOut' f; 687 OutPos(f, getPos()) handle exn => raise mapToIo(exn, name, "getPosOut") 688 ) 689 690 | getPosOut'(OutStream{wrtr=WR{name, ...}, ...}) = 691 raise Io { name = name, function = "getPosOut", 692 cause = RandomAccessNotSupported } 693 694 fun setPosOut' p (f as OutStream{wrtr=WR{setPos=SOME setPos, ...}, ...}) = 695 ( 696 flushOut' f; 697 setPos p; 698 f 699 ) 700 | setPosOut' _ (OutStream{wrtr=WR{name, ...}, ...}) = 701 raise Io { name = name, function = "setPosOut", 702 cause = RandomAccessNotSupported } 703 in 704 fun output1(f, c) = protectOut (output1' c) f 705 fun output(f, v) = protectOut (outputVector(v, 0, Vector.length v)) f 706 val flushOut = protectOut flushOut' 707 val closeOut = protectOut closeOut' 708 val getWriter = protectOut getWriter' 709 fun setBufferMode(f, n) = protectOut (setBufferMode' n) f 710 711 (* Exported function to output part of a vector. Non-standard. *) 712 fun outputVec(f, slice) = 713 let 714 val (v, i, len) = VectorSlice.base slice 715 in 716 protectOut (outputVector(v, i, len)) f 717 end 718 719 val getPosOut = protectOut getPosOut' 720 721 fun setPosOut(OutPos(f, p)) = protectOut (setPosOut' p) f 722 end 723 724 725 fun filePosOut(OutPos(_, p)) = p 726 727 (* We need to set up a function to flush the streams when we 728 exit. This has to be set up for every session so we set up 729 an entry function, which is persistent, to do it. *) 730 local 731 fun closeAll () = 732 (* Close all the streams. closeOut removes the streams 733 from the list so we should end up with outputStreamList 734 being nil. *) 735 List.foldl (fn (s, ()) => closeOut s handle _ => ()) () 736 (! outputStreamList) 737 738 fun doOnEntry () = OS.Process.atExit closeAll 739 in 740 val () = LibrarySupport.addOnEntry doOnEntry; 741 val () = doOnEntry() (* Set it up for this session as well. *) 742 end 743 744 local 745 open PolyML 746 fun printWithName(s, name) = 747 PolyML.PrettyString(String.concat[s, "-\"", String.toString name, "\""]) 748 749 fun prettyIn depth a (Committed { rest, ...}) = 750 prettyIn depth a rest 751 | prettyIn _ _ (Uncommitted { state = ref streamState, ...}) = 752 let 753 fun prettyState Truncated = 754 PolyML.PrettyString("Instream-truncated") 755 | prettyState (HaveRead{ rest = ref rest, ...}) = 756 prettyState rest 757 | prettyState (ToRead(RD{name, ...})) = 758 printWithName("Instream", name) 759 in 760 prettyState streamState 761 end 762 763 fun prettyOut _ _ (OutStream { wrtr = WR { name, ...}, ...}) = 764 printWithName("Outstream", name) 765 in 766 val () = addPrettyPrinter prettyIn 767 val () = addPrettyPrinter prettyOut 768 end 769end; 770 771(* Define the StreamIO functor in terms of BasicStreamIO to filter 772 out outputVec. *) 773(* This is non-standard. According to G&R 2004 StreamIO does not take the slice structures as args. *) 774functor StreamIO( 775 structure PrimIO : PRIM_IO 776 structure Vector : MONO_VECTOR 777 structure Array : MONO_ARRAY 778 structure VectorSlice: MONO_VECTOR_SLICE 779 structure ArraySlice: MONO_ARRAY_SLICE 780 sharing type PrimIO.elem = Vector.elem = Array.elem = VectorSlice.elem = ArraySlice.elem 781 sharing type PrimIO.vector = Vector.vector = Array.vector = VectorSlice.vector = ArraySlice.vector 782 sharing type PrimIO.array = Array.array = ArraySlice.array 783 sharing type PrimIO.vector_slice = VectorSlice.slice = ArraySlice.vector_slice 784 sharing type PrimIO.array_slice = ArraySlice.slice 785 val someElem : PrimIO.elem 786 ): STREAM_IO = 787struct 788 structure StreamIO = 789 BasicStreamIO(structure PrimIO = PrimIO 790 and Vector = Vector 791 and Array = Array 792 and VectorSlice = VectorSlice 793 and ArraySlice = ArraySlice 794 val someElem = someElem) 795 open StreamIO 796end; 797