1(* 2 Title: Standard Basis Library: StreamIO functor 3 Copyright David C.J. Matthews 2000, 2005 4 5 This library is free software; you can redistribute it and/or 6 modify it under the terms of the GNU Lesser General Public 7 License as published by the Free Software Foundation; either 8 version 2.1 of the License, or (at your option) any later version. 9 10 This library is distributed in the hope that it will be useful, 11 but WITHOUT ANY WARRANTY; without even the implied warranty of 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 Lesser General Public License for more details. 14 15 You should have received a copy of the GNU Lesser General Public 16 License along with this library; if not, write to the Free Software 17 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA 18*) 19 20functor BasicStreamIO( 21 structure PrimIO : PRIM_IO 22 structure Vector : MONO_VECTOR 23 structure Array : MONO_ARRAY 24 structure VectorSlice: MONO_VECTOR_SLICE 25 structure ArraySlice: MONO_ARRAY_SLICE 26 sharing type PrimIO.elem = Vector.elem = Array.elem = VectorSlice.elem = ArraySlice.elem 27 sharing type PrimIO.vector = Vector.vector = Array.vector = VectorSlice.vector = ArraySlice.vector 28 sharing type PrimIO.array = Array.array = ArraySlice.array 29 sharing type PrimIO.vector_slice = VectorSlice.slice = ArraySlice.vector_slice 30 sharing type PrimIO.array_slice = ArraySlice.slice 31 val someElem : PrimIO.elem 32 ): 33 sig 34 include STREAM_IO 35 (* Note: This is non-standard but enables us to define 36 the derived BinIO and TextIO structures more efficiently. *) 37 val outputVec: outstream * PrimIO.vector_slice -> unit 38 end = 39struct 40 open IO 41 type vector = Vector.vector 42 type elem = PrimIO.elem 43 datatype reader = datatype PrimIO.reader 44 datatype writer = datatype PrimIO.writer 45 type array = Array.array 46 type pos = PrimIO.pos 47 48 exception Interrupt = RunCall.Interrupt 49 50 (* Called after any exception in the lower level reader or 51 writer to map any exception other than Io into Io. *) 52 fun mapToIo (io as Io _, _, _) = io 53 | mapToIo (Interrupt, _, _) = Interrupt 54 | mapToIo (nonIo, name, caller) = 55 Io { name = name, function = caller, cause = nonIo } 56 57 val emptyVec = Vector.fromList [] (* Represents end-of-stream. *) 58 59 datatype instream = 60 (* The usual state of a stream: We may have to read from the reader 61 before we have any real data or we may have already read. *) 62 Uncommitted of { state: streamState ref, 63 locker: Thread.Mutex.mutex } 64 (* If we know we have unread input we can return this as the 65 stream. That allows part of the stream to be read without 66 locking. This is an optimisation. *) 67 | Committed of 68 { vec: vector, offset: int, rest: instream, startPos: pos option } 69 70 and streamState = 71 Truncated (* The stream has been closed or truncated. *) 72 | HaveRead of (* A vector has been read from the stream. If the 73 vector has size zero this is treated as EOF. 74 startPos is the position when the vector was 75 read. *) 76 {vec: vector, rest: streamState ref, startPos: pos option } 77 | ToRead of reader (* We have not yet closed or truncated the stream. *) 78 79 80 and outstream = 81 OutStream of { 82 wrtr: writer, 83 buffType : IO.buffer_mode ref, 84 buf: array, 85 bufp: int ref, 86 isTerm: bool ref, 87 locker: Thread.Mutex.mutex 88 } 89 90 datatype out_pos = OutPos of outstream * pos 91 92 (* Create a new stream from the vector and the reader. *) 93 fun mkInstream (r, v: vector): instream = 94 let 95 val readRest = 96 Uncommitted { state = ref (ToRead r), locker = Thread.Mutex.mutex() } 97 (* If the vector is non-empty the first entry is as though the 98 vector had been read otherwise it's just the reader. *) 99 in 100 if Vector.length v = 0 101 then readRest 102 else Committed { vec = v, offset = 0, rest = readRest, startPos = NONE } 103 end 104 105 local 106 fun input' (ref (HaveRead {vec, rest, ...}), locker) = 107 let 108 (* TODO: If we have already read further on we could convert 109 these entries to Committed. *) 110 in 111 (vec, Uncommitted{ state = rest, locker = locker }) 112 end 113 114 | input' (s as ref Truncated, locker) = (* Truncated: return end-of-stream *) 115 (emptyVec, Uncommitted{ state = s, locker = locker }) 116 117 | input' (state as 118 ref(readMore as ToRead (RD {chunkSize, readVec = SOME readVec, getPos, ...})), 119 locker) = 120 let 121 (* We've not yet read this. Try reading from the reader. *) 122 val startPos = 123 case getPos of SOME g => SOME(g()) | NONE => NONE 124 val data = readVec chunkSize 125 (* Create a reference to the reader which will be updated by 126 the next read. The ref is shared between the existing stream 127 and the new one so reading on either adds to the same chain. *) 128 val nextLink = ref readMore 129 val nextChunk = 130 HaveRead {vec = data, rest = nextLink, startPos = startPos} 131 in 132 (* Extend the stream by adding this vector to the list of chunks read so far. *) 133 state := nextChunk; 134 (* Return a new stream which continues reading. *) 135 (data, Uncommitted { state = nextLink, locker = locker }) 136 end 137 138 | input' (ref(ToRead(RD{name, ...})), _) = 139 (* readVec missing in reader. *) 140 raise Io { name = name, function = "input", cause = BlockingNotSupported } 141 142 fun inputNList' (ref (HaveRead {vec, rest, startPos}), locker, n) = 143 let 144 val vecLength = Vector.length vec 145 in 146 if vecLength = 0 (* End-of-stream: Return next in list. *) 147 then ([vec], Uncommitted{ state = rest, locker = locker }) 148 else if n < vecLength 149 then (* We can use what's already been read. The stream we return allows us 150 to read the rest without blocking. *) 151 ([VectorSlice.vector(VectorSlice.slice(vec, 0, SOME n))], 152 Committed{ vec = vec, offset = n, startPos = startPos, 153 rest = Uncommitted{ state = rest, locker = locker} }) 154 else if n = vecLength 155 then (* Exactly uses up the buffer. New stream state is the next entry. *) 156 ([vec], Uncommitted{ state = rest, locker = locker}) 157 else (* Have to get the next item *) 158 let 159 val (nextVecs, nextStream) = inputNList' (rest, locker, n - vecLength) 160 in 161 (vec :: nextVecs, nextStream) 162 end 163 end 164 165 | inputNList' (s as ref Truncated, locker, _) = 166 (* Truncated: return end-of-stream *) 167 ([emptyVec], Uncommitted{ state = s, locker = locker }) 168 169 | inputNList' (f, locker, n) = (* ToRead *) 170 let 171 val (vec, f') = input' (f, locker) 172 in 173 if Vector.length vec = 0 174 then ([vec], f') (* Truncated or end-of-file. *) 175 else inputNList' (f, locker, n) (* Reread *) 176 end 177 178 in 179 fun input (Uncommitted { state, locker }) = 180 LibraryIOSupport.protect locker input' (state, locker) 181 182 | input (Committed { vec, offset, rest, ... }) = 183 (* This stream was produced from re-reading a stream that already 184 had data. We can return the result without the overhead of locking. *) 185 (VectorSlice.vector(VectorSlice.slice(vec, offset, NONE)), rest) 186 187 fun inputNList (Uncommitted { state, locker }, n) = 188 LibraryIOSupport.protect locker inputNList' (state, locker, n) 189 190 | inputNList (Committed { vec, offset, rest, startPos }, n) = 191 let 192 val vecLength = Vector.length vec 193 in 194 if vecLength = 0 (* End-of-stream: Return next in list. *) 195 then ([vec], rest) 196 else if n < vecLength - offset 197 then (* We can use what's already been read. Next entry is a committed 198 stream that returns the part we haven't yet used. *) 199 ([VectorSlice.vector(VectorSlice.slice(vec, offset, SOME n))], 200 Committed{ vec = vec, offset = offset+n, rest = rest, startPos = startPos }) 201 else if n = vecLength - offset 202 then (* Exactly uses up the buffer. New stream state is the next entry. *) 203 ([VectorSlice.vector(VectorSlice.slice(vec, offset, NONE))], rest) 204 else (* Have to get the next item *) 205 let 206 val (nextVecs, nextStream) = inputNList (rest, n - (vecLength - offset)) 207 in 208 (VectorSlice.vector(VectorSlice.slice(vec, offset, NONE)) :: nextVecs, 209 nextStream) 210 end 211 end 212 213 fun inputN (f, n) = 214 if n < 0 215 then raise Size 216 else if n = 0 (* Defined to return the empty vector and f *) 217 then (emptyVec, f) 218 else 219 let 220 val (vecs, f') = inputNList (f, n) 221 in 222 (Vector.concat vecs, f') 223 end 224 225 (* Read the whole of the remaining input until we get an EOF. *) 226 fun inputAll f = 227 let 228 (* Find out the size of the file. *) 229 fun getSize(n, f) = 230 let 231 val (v, f') = input f 232 val vSize = Vector.length v 233 in 234 if vSize = 0 235 then n (* Reached EOF. *) 236 else getSize (n + vSize, f') 237 end 238 in 239 (* Read the whole file. *) 240 inputN(f, getSize(0,f)) 241 end 242 243 (* Note a crucial difference between inputN and input1. Because input1 244 does not return a stream if it detects EOF it cannot advance beyond 245 a temporary EOF in a stream. *) 246 fun input1 (Committed { vec, offset, rest, startPos }) = 247 let 248 val vecSize = Vector.length vec 249 in 250 if vecSize = 0 251 then NONE 252 else if vecSize = offset+1 253 then SOME(Vector.sub(vec, offset), rest) 254 else SOME(Vector.sub(vec, offset), 255 Committed{ vec = vec, offset = offset+1, rest = rest, startPos = startPos }) 256 end 257 258 | input1 f = 259 let 260 val (s, f') = inputN (f, 1) 261 in 262 if Vector.length s = 0 263 then NONE 264 else SOME(Vector.sub(s, 0), f') 265 end 266 267 end 268 269 local 270 fun doClose (ref (HaveRead {rest, ...})) = doClose rest 271 | doClose (ref Truncated) = () 272 | doClose (state as ref (ToRead (RD{close, ...}))) = 273 (state := Truncated; close()) 274 in 275 fun closeIn (Uncommitted { state, locker }) = LibraryIOSupport.protect locker doClose state 276 | closeIn (Committed { rest, ...}) = closeIn rest 277 end 278 279 local 280 (* Return the reader. *) 281 fun getReader' (ref (HaveRead {rest, ...})) = getReader' rest 282 | getReader' (ref Truncated) = 283 raise Io { name = "", function = "getReader", cause = ClosedStream } 284 | getReader' (state as ref (ToRead reader)) = 285 (state := Truncated; reader) 286 in 287 fun getReader'' (Uncommitted { state, locker }) = 288 LibraryIOSupport.protect locker getReader' state 289 | getReader'' (Committed { rest, ... }) = getReader'' rest 290 291 fun getReader f = 292 let 293 val reader = getReader'' f 294 val (allInput, _) = inputAll f 295 in 296 (* Return the reader along with buffered input. It's not clear 297 what to do if there are EOFs in the stream. The book says the 298 result is the result of inputAll which takes everything up to the 299 first EOF. *) 300 (reader, allInput) 301 end 302 end 303 304 local 305 (* Check that the stream is not terminated and then convert a file position 306 plus a vector offset into a file position. In particular, if the reader 307 has converted CRNL into NL we don't have a simple relationship between 308 elements and file offsets. *) 309 fun findPosition'(startPos, offset, HaveRead {rest=ref rest, ...}) = 310 findPosition'(startPos, offset, rest) 311 | findPosition'(_, _, Truncated) = 312 raise Io { name = "", function = "filePosIn", cause = ClosedStream } 313 | findPosition'(startPos, offset, 314 ToRead (RD { getPos = SOME getPos, setPos = SOME setPos, 315 readVec = SOME readVec, ...})) = 316 if offset = 0 317 then startPos (* Easy *) 318 else 319 (* When we read this vector we recorded the file position of 320 the beginning only. To find the file position of the 321 particular element we actually need to read the portion of 322 the input up to that element and find out the file position 323 at that point. *) 324 let 325 val savep = getPos() (* Save current position. *) 326 (* Move to the point where we read the vector. *) 327 val () = setPos startPos; 328 (* Call readVec until we have read the required number 329 of elements. N.B. Ganser & Reppy has a bug here. 330 There is no guarantee that readVec n will actually 331 return n elements so it's unsafe to assume that it 332 will move the file pointer by n elements. *) 333 fun doRead n = 334 let 335 val read = Vector.length(readVec n) 336 in 337 if read = n orelse read = 0 (* Error? *) 338 then () 339 else doRead (n - read) 340 end 341 (* Read the offset number of elements. *) 342 val () = doRead offset; 343 (* Record the position after actually reading the elements. *) 344 val position = getPos(); 345 in 346 setPos savep; (* Restore. *) 347 position 348 end 349 | findPosition'(_, _, ToRead _) = 350 raise Io { name = "", function = "filePosIn", 351 cause = RandomAccessNotSupported } 352 353 fun findPosition(startPos, offset, Committed { rest, ... }) = 354 findPosition(startPos, offset, rest) 355 | findPosition(startPos, offset, Uncommitted { state = ref state, locker }) = 356 LibraryIOSupport.protect locker findPosition' (startPos, offset, state) 357 358 fun filePosIn' (HaveRead {rest=ref rest, startPos = SOME startPos, ...}) = 359 findPosition'(startPos, 0, rest) 360 | filePosIn' (HaveRead {startPos = NONE, ...}) = 361 raise Io { name = "", function = "filePosIn", 362 cause = RandomAccessNotSupported } 363 | filePosIn' Truncated = 364 raise Io { name = "", function = "filePosIn", cause = ClosedStream } 365 | filePosIn' (ToRead(RD { getPos = SOME getPos, ...})) = getPos() 366 | filePosIn' (ToRead _) = 367 raise Io { name = "", function = "filePosIn", 368 cause = RandomAccessNotSupported } 369 370 in 371 (* Find the first entry to get the position. *) 372 fun filePosIn (Uncommitted { state = ref state, locker }) = 373 LibraryIOSupport.protect locker filePosIn' state 374 | filePosIn (Committed { offset, rest, startPos = SOME startPos, ... }) = 375 findPosition(startPos, offset, rest) 376 | filePosIn (Committed { startPos = NONE, ... }) = 377 (* This can occur either because the reader doesn't support getPos or 378 because the position is within the initial vector passed to 379 mkInstream. *) 380 raise Io { name = "", function = "filePosIn", 381 cause = RandomAccessNotSupported } 382 end 383 384 local 385 fun doCanInput' (ref (HaveRead {vec, rest, ...}), locker, n, k) = 386 let 387 val vecLength = Vector.length vec 388 in 389 if vecLength = 0 390 then SOME k 391 else if vecLength >= n 392 then SOME (k+n) 393 else doCanInput'(rest, locker, n-vecLength, k+vecLength) 394 end 395 396 | doCanInput' (ref Truncated, _, _, k) = SOME k 397 398 | doCanInput' (state as 399 ref(readMore as ToRead (RD {chunkSize, readVecNB = SOME readVecNB, getPos, ...})), 400 locker, n, k) = 401 let 402 val startPos = 403 case getPos of SOME g => SOME(g()) | NONE => NONE 404 in 405 (* Read a block full. This will avoid us creating lots of small items 406 in the list if there is actually plenty of input available. *) 407 case readVecNB chunkSize of 408 NONE => (* Reading these would block but we may already have some input. *) 409 if k = 0 then NONE else SOME k 410 | SOME data => 411 let (* We have to record this in the stream. *) 412 val nextLink = ref readMore 413 val nextChunk = 414 HaveRead {vec = data, rest = nextLink, startPos = startPos} 415 in 416 state := nextChunk; 417 (* Check whether this has satisfied the request. *) 418 doCanInput'(state, locker, n, k) 419 end 420 end 421 422 | doCanInput' (ref(ToRead(RD {name, ...})), _, _, _) = 423 (* readVecNB missing in reader. *) 424 raise Io { name = name, function = "canInput", cause = NonblockingNotSupported } 425 426 fun doCanInput (Uncommitted { state, locker }, n, k) = 427 LibraryIOSupport.protect locker doCanInput' (state, locker, n, k) 428 | doCanInput (Committed { vec, rest, ... }, n, k) = 429 let 430 val vecLength = Vector.length vec 431 in 432 if vecLength = 0 433 then SOME k (* Reached EOF. *) 434 else if vecLength >= n 435 then SOME (k + n) (* Have already read enough. *) 436 else doCanInput(rest, n-vecLength, k+vecLength) 437 end 438 in 439 fun canInput(f, n) = if n < 0 then raise Size else doCanInput(f, n, 0) 440 end 441 442 443 (* Look for end-of-stream. Could be defined more directly 444 but it probably isn't worth it. *) 445 fun endOfStream f = 446 let 447 val (v, _) = input f 448 in 449 Vector.length v = 0 450 end 451 452 453 (* OUTPUT *) 454 (* In order to be able to flush and close the streams when we exit 455 we need to keep a list of the output streams. 456 One unfortunate side-effect of this is that the RTS can't 457 garbage-collect output streams since there will always be 458 a reference to a stream until it is explicitly closed. 459 It could be worth using a weak reference here but that 460 requires either a separate thread or some way of registering 461 a function to be called to check the list. *) 462 val ostreamLock = Thread.Mutex.mutex() 463 (* Use a no-overwrite ref for the list of streams. This ensures that 464 the ref will not be overwritten if we load a saved state. *) 465 val outputStreamList: outstream list ref = LibrarySupport.noOverwriteRef nil; 466 467 fun protectOut f (outs as OutStream{locker, ...}) = LibraryIOSupport.protect locker f outs 468 469 fun mkOutstream'(wrtr as WR{chunkSize, ...}, buffMode) = 470 let 471 open Thread.Mutex 472 val strm = 473 OutStream{wrtr=wrtr, 474 buffType=ref buffMode, 475 buf=Array.array(chunkSize, someElem), 476 isTerm=ref false, 477 bufp=ref 0, 478 locker=Thread.Mutex.mutex()} 479 in 480 (* Add it to the list. *) 481 outputStreamList := strm :: ! outputStreamList; 482 strm 483 end 484 485 val mkOutstream = LibraryIOSupport.protect ostreamLock mkOutstream' 486 487 fun getBufferMode(OutStream{buffType=ref b, ...}) = b 488 489 local 490 (* Flush anything from the buffer. *) 491 fun flushOut'(OutStream{buf, bufp=bufp as ref endBuf, 492 wrtr=wrtr as WR{name, ...}, ...}) = 493 if endBuf = 0 then () (* Nothing buffered *) 494 else case wrtr of 495 WR{writeArr=SOME wa, ...} => 496 let 497 fun flushBuff n = 498 let 499 val written = 500 wa(ArraySlice.slice(buf, n, SOME(endBuf-n))) 501 handle exn => raise mapToIo(exn, name, "flushOut") 502 in 503 if written+n = endBuf then () 504 else flushBuff(written+n) 505 end 506 in 507 (* Set the buffer to empty BEFORE writing anything. If 508 we get an asynchronous interrupt (ctrl-C) we want to 509 lose data in preference to duplicating it. *) 510 bufp := 0; 511 flushBuff 0 512 end 513 | _ => 514 raise Io { name = name, function = "flushOut", 515 cause = BlockingNotSupported } 516 517 (* Terminate a stream either because it has been closed or 518 because we have extracted the underlying writer. *) 519 fun terminateStream'(OutStream{isTerm=ref true, ...}) = () (* Nothing to do. *) 520 | terminateStream'(f as OutStream{isTerm, ...}) = 521 let 522 (* outstream is not an equality type but we can get the 523 desired effect by comparing the isTerm references for 524 equality (N.B. NOT their contents). *) 525 fun removeThis(OutStream{isTerm=isTerm', ...}) = 526 isTerm' <> isTerm 527 open Thread.Mutex 528 in 529 isTerm := true; 530 lock ostreamLock; 531 outputStreamList := List.filter removeThis (!outputStreamList); 532 unlock ostreamLock; 533 flushOut' f 534 end; 535 536 (* Close the stream. It is safe to repeat this and we may need to close 537 the writer even if the stream is terminated. *) 538 fun closeOut'(f as OutStream{wrtr=WR{close, ...}, ...}) = 539 ( 540 terminateStream' f; 541 close() (* Close the underlying writer. *) 542 ) 543 544 (* Flush the stream, terminate it and return the underlying writer. *) 545 fun getWriter'(OutStream{wrtr=WR{name, ...}, isTerm=ref true, ...}) = 546 (* Already terminated. *) 547 raise Io { name = name, function = "getWriter", 548 cause = ClosedStream } 549 | getWriter'(f as OutStream{buffType, wrtr, ...}) = 550 ( 551 terminateStream' f; 552 (wrtr, !buffType) 553 ) 554 555 (* Set the buffer mode, possibly flushing the buffer as it does. *) 556 fun setBufferMode' newBuff (f as OutStream{buffType, bufp, ...}) = 557 (* Question: What if the stream is terminated? *) 558 ( 559 if newBuff = NO_BUF andalso !bufp <> 0 560 then (* Flush pending output. *) 561 (* Switching from block to line buffering does not flush. *) 562 flushOut' f 563 else (); 564 buffType := newBuff 565 ) 566 567 (* Internal function: Write a vector directly to the writer. It only 568 returns when the vector has been completely written. 569 "output" should work if the writer only provides writeArr so we 570 may have to use that if writeVec isn't there. *) 571 (* FOR TESTING: Put writeArr first. *) 572 fun writeVec(OutStream{wrtr=WR{writeVec=SOME wv, name, ...}, ...}, v, i, len) = 573 let 574 fun forceOut p = 575 let 576 val written = wv(VectorSlice.slice(v, p+i, SOME(len-p))) 577 handle exn => raise mapToIo(exn, name, "output") 578 in 579 if written+p = len then () 580 else forceOut(written+p) 581 end 582 in 583 forceOut 0 584 end 585 | writeVec(OutStream{wrtr=WR{writeArr=SOME wa, name, ...}, ...}, v, i, len) = 586 let 587 val buffSize = 10 588 val buff = Array.array(buffSize, someElem); 589 fun forceOut p = 590 let 591 val toCopy = Int.min(len-p, buffSize) 592 val () = 593 ArraySlice.copyVec{src=VectorSlice.slice(v, p+i, SOME toCopy), dst=buff, di=0} 594 val written = wa(ArraySlice.slice(buff, 0, SOME toCopy)) 595 handle exn => raise mapToIo(exn, name, "output") 596 in 597 if written+p = len then () 598 else forceOut(written+p) 599 end 600 in 601 forceOut 0 602 end 603 | writeVec(OutStream{wrtr=WR{name, ...}, ...}, _, _, _) = 604 raise Io { name = name, function = "output", 605 cause = BlockingNotSupported } 606 607 (* Internal function. Write a vector to the stream using the start and 608 length provided. *) 609 fun outputVector _ (OutStream{isTerm=ref true, wrtr=WR{name, ...}, ...}) = 610 raise Io { name = name, function = "output", cause = ClosedStream } 611 | outputVector (v, start, vecLen) (f as OutStream{buffType, buf, bufp, ...}) = 612 let 613 val buffLen = Array.length buf 614 615 fun arrayCopyVec{src: Vector.vector, si: int, len: int, dst: Array.array, di: int} = 616 ArraySlice.copyVec{src=VectorSlice.slice(src, si, SOME len), dst=dst, di=di}; 617 618 fun addVecToBuff () = 619 if vecLen < buffLen - !bufp 620 then (* Room in the buffer. *) 621 ( 622 arrayCopyVec{src=v, si=start, len=vecLen, dst=buf, di= !bufp}; 623 bufp := !bufp + vecLen 624 ) 625 else 626 let 627 val buffSpace = buffLen - !bufp 628 in 629 (* Copy as much of the vector as will fit *) 630 arrayCopyVec{src=v, si=start, len=buffSpace, dst=buf, di= !bufp}; 631 bufp := !bufp+buffSpace; 632 (* TODO: Flushing the buffer ensures that all the 633 buffer contents have been written. We don't 634 actually need that, what we need is for enough 635 to have been written that we have space in the 636 buffer for the rest of the vector. *) 637 flushOut' f; (* Write it out. *) 638 (* Copy the rest of the vector. *) 639 arrayCopyVec{src=v, si=start+buffSpace, len=vecLen-buffSpace, dst=buf, di=0}; 640 bufp := vecLen-buffSpace 641 end (* addVecToBuff *) 642 in 643 if vecLen > buffLen 644 then (* If the vector is too large to put in the buffer we're 645 going to have to write something out. To reduce copying 646 we simply flush the buffer and write the vector directly. *) 647 (flushOut' f; writeVec(f, v, start, vecLen)) 648 else (* Try copying to the buffer. *) 649 if !buffType = IO.NO_BUF 650 then (* Write it directly *) writeVec(f, v, start, vecLen) 651 else (* Block or line buffering - add it to the buffer. 652 Line buffering is treated as block buffering on binary 653 streams and handled at the higher level for text streams. *) 654 addVecToBuff() 655 end (* outputVec *) 656 657 (* This could be defined in terms of outputVector but this is 658 likely to be much more efficient if we are buffering. *) 659 fun output1' _ (OutStream{isTerm=ref true, wrtr=WR{name, ...}, ...}) = 660 raise Io { name = name, function = "output1", cause = ClosedStream } 661 | output1' c (f as OutStream{buffType, buf, bufp, ...}) = 662 if !buffType = IO.NO_BUF 663 then writeVec(f, Vector.fromList[c], 0, 1) 664 else (* Line or block buffering. *) 665 ( 666 Array.update(buf, !bufp, c); 667 bufp := !bufp + 1; 668 if !bufp = Array.length buf then flushOut' f else () 669 ) 670 671 fun getPosOut'(f as OutStream{wrtr=WR{name, getPos=SOME getPos, ...}, ...}) = 672 ( 673 flushOut' f; 674 OutPos(f, getPos()) handle exn => raise mapToIo(exn, name, "getPosOut") 675 ) 676 677 | getPosOut'(OutStream{wrtr=WR{name, ...}, ...}) = 678 raise Io { name = name, function = "getPosOut", 679 cause = RandomAccessNotSupported } 680 681 fun setPosOut' p (f as OutStream{wrtr=WR{setPos=SOME setPos, ...}, ...}) = 682 ( 683 flushOut' f; 684 setPos p; 685 f 686 ) 687 | setPosOut' _ (OutStream{wrtr=WR{name, ...}, ...}) = 688 raise Io { name = name, function = "setPosOut", 689 cause = RandomAccessNotSupported } 690 in 691 fun output1(f, c) = protectOut (output1' c) f 692 fun output(f, v) = protectOut (outputVector(v, 0, Vector.length v)) f 693 val flushOut = protectOut flushOut' 694 val closeOut = protectOut closeOut' 695 val getWriter = protectOut getWriter' 696 fun setBufferMode(f, n) = protectOut (setBufferMode' n) f 697 698 (* Exported function to output part of a vector. Non-standard. *) 699 fun outputVec(f, slice) = 700 let 701 val (v, i, len) = VectorSlice.base slice 702 in 703 protectOut (outputVector(v, i, len)) f 704 end 705 706 val getPosOut = protectOut getPosOut' 707 708 fun setPosOut(OutPos(f, p)) = protectOut (setPosOut' p) f 709 end 710 711 712 fun filePosOut(OutPos(_, p)) = p 713 714 (* We need to set up a function to flush the streams when we 715 exit. This has to be set up for every session so we set up 716 an entry function, which is persistent, to do it. *) 717 local 718 fun closeAll () = 719 (* Close all the streams. closeOut removes the streams 720 from the list so we should end up with outputStreamList 721 being nil. *) 722 List.foldl (fn (s, ()) => closeOut s handle _ => ()) () 723 (! outputStreamList) 724 725 fun doOnEntry () = OS.Process.atExit closeAll 726 in 727 val () = PolyML.onEntry doOnEntry; 728 val () = doOnEntry() (* Set it up for this session as well. *) 729 end 730 731 local 732 open PolyML 733 fun printWithName(s, name) = 734 PolyML.PrettyString(String.concat[s, "-\"", String.toString name, "\""]) 735 736 fun prettyIn depth a (Committed { rest, ...}) = 737 prettyIn depth a rest 738 | prettyIn _ _ (Uncommitted { state = ref streamState, ...}) = 739 let 740 fun prettyState Truncated = 741 PolyML.PrettyString("Instream-truncated") 742 | prettyState (HaveRead{ rest = ref rest, ...}) = 743 prettyState rest 744 | prettyState (ToRead(RD{name, ...})) = 745 printWithName("Instream", name) 746 in 747 prettyState streamState 748 end 749 750 fun prettyOut _ _ (OutStream { wrtr = WR { name, ...}, ...}) = 751 printWithName("Outstream", name) 752 in 753 val () = addPrettyPrinter prettyIn 754 val () = addPrettyPrinter prettyOut 755 end 756end; 757 758(* Define the StreamIO functor in terms of BasicStreamIO to filter 759 out outputVec. *) 760(* This is non-standard. According to G&R 2004 StreamIO does not take the slice structures as args. *) 761functor StreamIO( 762 structure PrimIO : PRIM_IO 763 structure Vector : MONO_VECTOR 764 structure Array : MONO_ARRAY 765 structure VectorSlice: MONO_VECTOR_SLICE 766 structure ArraySlice: MONO_ARRAY_SLICE 767 sharing type PrimIO.elem = Vector.elem = Array.elem = VectorSlice.elem = ArraySlice.elem 768 sharing type PrimIO.vector = Vector.vector = Array.vector = VectorSlice.vector = ArraySlice.vector 769 sharing type PrimIO.array = Array.array = ArraySlice.array 770 sharing type PrimIO.vector_slice = VectorSlice.slice = ArraySlice.vector_slice 771 sharing type PrimIO.array_slice = ArraySlice.slice 772 val someElem : PrimIO.elem 773 ): STREAM_IO = 774struct 775 structure StreamIO = 776 BasicStreamIO(structure PrimIO = PrimIO 777 and Vector = Vector 778 and Array = Array 779 and VectorSlice = VectorSlice 780 and ArraySlice = ArraySlice 781 val someElem = someElem) 782 open StreamIO 783end; 784