1(* 2 Title: Thread package for ML. 3 Author: David C. J. Matthews 4 Copyright (c) 2007-2014, 2018 5 6 This library is free software; you can redistribute it and/or 7 modify it under the terms of the GNU Lesser General Public 8 License version 2.1 as published by the Free Software Foundation. 9 10 This library is distributed in the hope that it will be useful, 11 but WITHOUT ANY WARRANTY; without even the implied warranty of 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 Lesser General Public License for more details. 14 15 You should have received a copy of the GNU Lesser General Public 16 License along with this library; if not, write to the Free Software 17 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA 18*) 19 20(* This signature and structure are not part of the standard basis library 21 but are included here because they depend on the Time structure and are 22 in turn dependencies of the BasicIO structure. *) 23 24signature THREAD = 25sig 26 exception Thread of string (* Raised if an operation fails. *) 27 28 structure Thread: 29 sig 30 eqtype thread 31 32 (* Thread attributes - This may be extended. *) 33 datatype threadAttribute = 34 (* Does this thread accept a broadcast interrupt? The default is not to 35 accept broadcast interrupts. *) 36 EnableBroadcastInterrupt of bool 37 (* How to handle interrupts. The default is to handle interrupts synchronously. *) 38 | InterruptState of interruptState 39 (* Maximum size of the ML stack in words. NONE means unlimited *) 40 | MaximumMLStack of int option 41 42 and interruptState = 43 InterruptDefer (* Defer any interrupts. *) 44 | InterruptSynch (* Interrupts are delivered synchronously. An interrupt 45 will be delayed until an interruption point. An interruption point 46 is one of: testInterrupt, ConditionVar.wait, ConditionVar.waitUntil 47 and various library calls that may block, such as IO calls, pause etc. 48 N.B. Mutex.lock is not an interruption point even though it can result 49 in a thread blocking for an indefinite period. *) 50 | InterruptAsynch (* Interrupts are delivered asynchronously i.e. at a suitable 51 point soon after they are triggered. *) 52 | InterruptAsynchOnce (* As InterruptAsynch except that only a single interrupt 53 is delivered asynchronously after which the interrupt state is changed to 54 InterruptSynch. It allows a thread to tidy up and if necessary indicate 55 that it has been interrupted without the risk of a second asynchronous 56 interrupt occurring in the handler for the first interrupt. *) 57 58 (* fork: Fork a thread. Starts a new thread running the function argument. The 59 attribute list gives initial values for thread attributes which can be 60 modified by the thread itself. Any unspecified attributes take default values. 61 The thread is terminated when the thread function returns, if it 62 raises an uncaught exception or if it calls "exit". *) 63 val fork: (unit->unit) * threadAttribute list -> thread 64 (* exit: Terminate this thread. *) 65 val exit: unit -> unit 66 (* isActive: Test if a thread is still running or has terminated. *) 67 val isActive: thread -> bool 68 69 (* Test whether thread ids are the same. No longer needed if this is an eqtype. *) 70 val equal: thread * thread -> bool 71 (* Get my own ID. *) 72 val self: unit -> thread 73 74 exception Interrupt (* = SML90.Interrupt *) 75 (* Send an Interrupt exception to a specific thread. When and indeed whether 76 the exception is actually delivered will depend on the interrupt state 77 of the target thread. Raises Thread if the thread is no longer running, 78 so an exception handler should be used unless the thread is known to 79 be blocked. *) 80 val interrupt: thread -> unit 81 (* Send an interrupt exception to every thread which is set to accept it. *) 82 val broadcastInterrupt: unit -> unit 83 (* If this thread is handling interrupts synchronously, test to see 84 if it has been interrupted. If so it raises the Interrupt 85 exception. *) 86 val testInterrupt: unit -> unit 87 (* Terminate a thread. This should be used as a last resort. Normally 88 a thread should be allowed to clean up and terminate by using the 89 interrupt call. Raises Thread if the thread is no longer running, 90 so an exception handler should be used unless the thread is known to 91 be blocked. *) 92 val kill: thread -> unit 93 94 (* Get and set thread-local store for the calling thread. The store is a 95 tagged associative memory which is initially empty for a new thread. 96 A thread can call setLocal to add or replace items in its store and 97 call getLocal to return values if they exist. The Universal structure 98 contains functions to make new tags as well as injection, projection and 99 test functions. *) 100 val getLocal: 'a Universal.tag -> 'a option 101 val setLocal: 'a Universal.tag * 'a -> unit 102 103 (* Change the specified attribute(s) for the calling thread. Unspecified 104 attributes remain unchanged. *) 105 val setAttributes: threadAttribute list -> unit 106 (* Get the values of attributes. *) 107 val getAttributes: unit -> threadAttribute list 108 109 (* Return the number of processors that will be used to run threads. *) 110 val numProcessors: unit -> int 111 (* and the number of physical processors if that is available. *) 112 and numPhysicalProcessors: unit -> int option 113 end 114 115 structure Mutex: 116 sig 117 (* Mutexes. A mutex provides simple mutual exclusion. A thread can lock 118 a mutex and until it unlocks it no other thread will be able to lock it. 119 Locking and unlocking are intended to be fast in the situation when 120 there is no other process attempting to lock the mutex. *) 121 type mutex 122 (* mutex: Make a new mutex *) 123 val mutex: unit -> mutex 124 (* lock: Lock a mutex. If the mutex is currently locked the thread is 125 blocked until it is unlocked. If a thread tries to lock a mutex that 126 it has previously locked the thread will deadlock. 127 N.B. "lock" is not an interruption point (a point where synchronous 128 interrupts are delivered) even though a thread can be blocked indefinitely. *) 129 val lock: mutex -> unit 130 (* unlock: Unlock a mutex and allow any waiting threads to run. The behaviour 131 if the mutex was not previously locked by the calling thread is undefined. *) 132 val unlock: mutex -> unit 133 (* trylock: Attempt to lock the mutex. Returns true if the mutex was not 134 previously locked and has now been locked by the calling thread. Returns 135 false if the mutex was previously locked, including by the calling thread. *) 136 val trylock: mutex -> bool 137 138 (* These functions may not work correctly if an asynchronous interrupt 139 is delivered during the calls. A thread should use synchronous interrupt 140 when using these calls. *) 141 end 142 143 structure ConditionVar: 144 sig 145 (* Condition variables. Condition variables are used to provide communication 146 between threads. A condition variable is used in conjunction with a mutex 147 and usually a reference to establish and test changes in state. The normal 148 use is for one thread to lock a mutex, test the reference and then wait on 149 the condition variable, releasing the lock on the mutex while it does so. 150 Another thread may then lock the mutex, update the reference, unlock the 151 mutex, and signal the condition variable. This wakes up the first thread 152 and reacquires the lock allowing the thread to test the updated reference 153 with the lock held. 154 More complex communication mechanisms, such as blocking channels, can 155 be written in terms of condition variables. *) 156 type conditionVar 157 (* conditionVar: Make a new condition variable. *) 158 val conditionVar: unit -> conditionVar 159 (* wait: Release the mutex and block until the condition variable is 160 signalled. When wait returns the mutex has been re-acquired. 161 If thread is handling interrupts synchronously a call to "wait" may cause 162 an Interrupt exception to be delivered. 163 (The implementation must ensure that if an Interrupt is delivered as well 164 as signal waking up a single thread that the interrupted thread does not 165 consume the "signal".) 166 The mutex is (re)acquired before Interrupt is delivered. *) 167 val wait: conditionVar * Mutex.mutex -> unit 168 (* waitUntil: As wait except that it blocks until either the condition 169 variable is signalled or the time (absolute) is reached. Either way 170 the mutex is reacquired so there may be a further delay if it is held 171 by another thread. *) 172 val waitUntil: conditionVar * Mutex.mutex * Time.time -> bool 173 (* signal: Wake up one thread if any are waiting on the condition variable. *) 174 val signal: conditionVar -> unit 175 (* broadcast: Wake up all threads waiting on the condition variable. *) 176 val broadcast: conditionVar -> unit 177 end 178 179end; 180 181structure Thread :> THREAD = 182struct 183 exception Thread = RunCall.Thread 184 185 (* Create non-overwritable mutables for mutexes and condition variables. 186 A non-overwritable mutable in the executable or a saved state is not 187 overwritten when a saved state further down the hierarchy is loaded. *) 188 val nvref = LibrarySupport.noOverwriteRef 189 190 structure Thread = 191 struct 192 open Thread (* Created in INITIALISE with thread type and self function. *) 193 194 (* Equality is pointer equality. *) 195 val equal : thread*thread->bool = op = 196 197 datatype threadAttribute = 198 EnableBroadcastInterrupt of bool 199 | InterruptState of interruptState 200 | MaximumMLStack of int option 201 202 and interruptState = 203 InterruptDefer 204 | InterruptSynch 205 | InterruptAsynch 206 | InterruptAsynchOnce 207 208 (* Convert attributes to bits and a mask. *) 209 fun attrsToWord (at: threadAttribute list): Word.word * Word.word = 210 let 211 (* Check that a particular attribute appears only once. 212 As well as accumulating the actual bits in the result we 213 also accumulate the mask of bits. If any of these 214 reappear we raise an exception. *) 215 fun checkRepeat(r, acc, set, mask) = 216 if Word.andb(set, mask) <> 0w0 217 then raise Thread "The same attribute appears more than once in the list" 218 else convert(r, acc, Word.orb(set, mask)) 219 220 and convert([], acc, set) = (acc, set) 221 | convert(EnableBroadcastInterrupt true :: r, acc, set) = 222 checkRepeat(r, Word.orb(acc, 0w1), set, 0w1) 223 | convert(EnableBroadcastInterrupt false :: r, acc, set) = 224 checkRepeat(r, acc (* No bit *), set, 0w1) 225 | convert(InterruptState s :: r, acc, set) = 226 checkRepeat(r, Word.orb(setIstateBits s, acc), set, 0w6) 227 | convert(MaximumMLStack _ :: r, acc, set) = 228 convert(r, acc, set) 229 in 230 convert(at, 0w0, 0w0) 231 end 232 233 and setIstateBits InterruptDefer = 0w0 234 | setIstateBits InterruptSynch = 0w2 235 | setIstateBits InterruptAsynch = 0w4 236 | setIstateBits InterruptAsynchOnce = 0w6 237 238 fun getIstateBits(w: Word.word): interruptState = 239 let 240 val ibits = Word.andb(w, 0w6) 241 in 242 if ibits = 0w0 243 then InterruptDefer 244 else if ibits = 0w2 245 then InterruptSynch 246 else if ibits = 0w4 247 then InterruptAsynch 248 else InterruptAsynchOnce 249 end 250 251 fun wordToAttrs w = 252 let 253 (* Enable broadcast - true if bottom bit is set. *) 254 val bcast = EnableBroadcastInterrupt(Word.andb(w, 0w1) = 0w1) 255 in 256 [bcast, InterruptState(getIstateBits w)] 257 end 258 259 exception Interrupt = RunCall.Interrupt 260 261 (* The thread id is opaque outside this structure but is actually a six 262 word mutable object. 263 Word 0: Index into thread table (used inside the RTS only) 264 Word 1: Flags: initialised by the RTS and set by this code 265 Word 2: Thread local store: read and set by this code. 266 Word 3: IntRequest: Set by the RTS if there is an interrupt pending 267 Word 4: Maximum ML stack size. Unlimited is stored here as zero 268 *) 269 val threadIdFlags = 0w1 270 and threadIdThreadLocal = 0w2 271 and threadIdIntRequest = 0w3 272 and threadIdStackSize = 0w4 273 274 fun getLocal (t: 'a Universal.tag) : 'a option = 275 let 276 val root: Universal.universal ref list = 277 RunCall.loadWord(self(), threadIdThreadLocal) 278 279 fun doFind [] = NONE 280 | doFind ((ref v)::r) = 281 if Universal.tagIs t v 282 then SOME(Universal.tagProject t v) 283 else doFind r 284 in 285 doFind root 286 end 287 288 fun setLocal (t: 'a Universal.tag, newVal: 'a) : unit = 289 let 290 (* See if we already have this in the list. *) 291 val root: Universal.universal ref list = 292 RunCall.loadWord(self(), threadIdThreadLocal) 293 294 fun doFind [] = 295 (* Not in the list - Add it. *) 296 RunCall.storeWord 297 (self(), threadIdThreadLocal, 298 ref (Universal.tagInject t newVal) :: root) 299 | doFind (v::r) = 300 if Universal.tagIs t (!v) 301 (* If it's in the list update it. *) 302 then v := Universal.tagInject t newVal 303 else doFind r 304 305 in 306 doFind root 307 end 308 309 local 310 val threadTestInterrupt: unit -> unit = RunCall.rtsCallFull0 "PolyThreadTestInterrupt" 311 in 312 fun testInterrupt() = 313 (* If there is a pending request the word in the thread object 314 will be non-zero. *) 315 if RunCall.loadWord(self(), threadIdIntRequest) <> 0 316 then threadTestInterrupt() 317 else () 318 end 319 320 local 321 fun getAttrWord (me: thread) : Word.word = 322 RunCall.loadWord(me, threadIdFlags) 323 324 fun getStackSizeAsInt (me: thread) : int = 325 RunCall.loadWord(me, threadIdStackSize) 326 327 and getStackSize me : int option = 328 case getStackSizeAsInt me of 329 0 => NONE 330 | s => SOME s 331 332 fun newStackSize ([], default) = default 333 | newStackSize (MaximumMLStack NONE :: _, _) = 0 334 | newStackSize (MaximumMLStack (SOME n) :: _, _) = 335 if n <= 0 then raise Thread "The stack size must be greater than zero" else n 336 | newStackSize (_ :: l, default) = newStackSize (l, default) 337 338 val threadMaxStackSize: int -> unit = RunCall.rtsCallFull1 "PolyThreadMaxStackSize" 339 in 340 (* Set attributes. Only changes the values that are specified. The 341 others remain the same. *) 342 fun setAttributes (attrs: threadAttribute list) : unit = 343 let 344 val me = self() 345 val oldValues: Word.word = getAttrWord me 346 val (newValue, mask) = attrsToWord attrs 347 val stack = newStackSize(attrs, getStackSizeAsInt me) 348 in 349 RunCall.storeWord (self(), threadIdFlags, 350 Word.orb(newValue, Word.andb(Word.notb mask, oldValues))); 351 if stack = getStackSizeAsInt me 352 then () else threadMaxStackSize stack; 353 (* If we are now handling interrupts asynchronously check whether 354 we have a pending interrupt now. This will only be effective 355 if we were previously handling them synchronously or blocking 356 them. *) 357 if Word.andb(newValue, 0w4) = 0w4 358 then testInterrupt() 359 else () 360 end 361 362 fun getAttributes() : threadAttribute list = 363 let 364 val me = self() 365 in 366 MaximumMLStack (getStackSize me) :: wordToAttrs(getAttrWord me) 367 end 368 369 (* These are used in the ConditionVar structure. They affect only the 370 interrupt handling bits. *) 371 fun getInterruptState(): interruptState = getIstateBits(getAttrWord(self())) 372 and setInterruptState(s: interruptState): unit = 373 RunCall.storeWord (self(), threadIdFlags, 374 Word.orb(setIstateBits s, Word.andb(Word.notb 0w6, getAttrWord(self())))) 375 376 local 377 (* The default for a new thread is to ignore broadcasts and handle explicit 378 interrupts synchronously. *) 379 val (defaultAttrs, _) = 380 attrsToWord[EnableBroadcastInterrupt false, InterruptState InterruptSynch] 381 val threadForkFunction: 382 (unit->unit) * word * int -> thread = RunCall.rtsCallFull3 "PolyThreadForkThread" 383 in 384 fun fork(f:unit->unit, attrs: threadAttribute list): thread = 385 let 386 (* Any attributes specified explicitly override the defaults. *) 387 val (attrWord, mask) = attrsToWord attrs 388 val attrValue = Word.orb(attrWord, Word.andb(Word.notb mask, defaultAttrs)) 389 val stack = newStackSize(attrs, 0 (* Default is unlimited *)) 390 in 391 threadForkFunction(f, attrValue, stack) 392 end 393 end 394 end 395 396 val exit: unit -> unit = RunCall.rtsCallFull0 "PolyThreadKillSelf" 397 and isActive: thread -> bool = RunCall.rtsCallFast1 "PolyThreadIsActive" 398 and broadcastInterrupt: unit -> unit = RunCall.rtsCallFull0 "PolyThreadBroadcastInterrupt" 399 400 local 401 (* Send an interrupt to a thread. If it returns false 402 the thread did not exist and this should raise an exception. *) 403 val threadSendInterrupt: thread -> bool = RunCall.rtsCallFast1 "PolyThreadInterruptThread" 404 in 405 fun interrupt(t: thread) = 406 if threadSendInterrupt t 407 then () 408 else raise Thread "Thread does not exist" 409 end 410 411 local 412 val threadKillThread: thread -> bool = RunCall.rtsCallFast1 "PolyThreadKillThread" 413 in 414 fun kill(t: thread) = 415 if threadKillThread t 416 then () 417 else raise Thread "Thread does not exist" 418 end 419 420 val numProcessors: unit -> int = RunCall.rtsCallFast0 "PolyThreadNumProcessors" 421 422 local 423 val numberOfPhysical: unit -> int = 424 RunCall.rtsCallFast0 "PolyThreadNumPhysicalProcessors" 425 in 426 fun numPhysicalProcessors(): int option = 427 (* It is not always possible to get this information *) 428 case numberOfPhysical() of 0 => NONE | n => SOME n 429 end 430 end 431 432 structure Mutex = 433 struct 434 type mutex = Word.word ref 435 fun mutex() = nvref 0w1; (* Initially unlocked. *) 436 open Thread (* atomicIncr, atomicDecr and atomicReset are set up by Initialise. *) 437 438 val threadMutexBlock: mutex -> unit = RunCall.rtsCallFull1 "PolyThreadMutexBlock" 439 val threadMutexUnlock: mutex -> unit = RunCall.rtsCallFull1 "PolyThreadMutexUnlock" 440 441 (* A mutex is implemented as a Word.word ref. It is initially set to 1 and locked 442 by atomically decrementing it. If it was previously unlocked the result will 443 by zero but if it was already locked it will be some negative value. When it 444 is unlocked it is atomically incremented. If there was no contention the result 445 will again be 1 but if some other thread tried to lock it the result will be 446 zero or negative. In that case the unlocking thread needs to call in to the 447 RTS to wake up the blocked thread. 448 449 The cost of contention on the lock is very high. To try to avoid this we 450 first loop (spin) to see if we can get the lock without contention. *) 451 452 val spin_cycle = 20000 453 fun spin (m: mutex, c: int) = 454 if ! m = 0w1 then () 455 else if c = spin_cycle then () 456 else spin(m, c+1); 457 458 fun lock (m: mutex): unit = 459 let 460 val () = spin(m, 0) 461 val newValue = atomicDecr m 462 in 463 if newValue = 0w0 464 then () (* We've acquired the lock. *) 465 else (* It's locked. We return when we have the lock. *) 466 ( 467 threadMutexBlock m; 468 lock m (* Try again. *) 469 ) 470 end 471 472 fun unlock (m: mutex): unit = 473 let 474 val newValue = atomicIncr m 475 in 476 if newValue = 0w1 477 then () (* No contention. *) 478 else 479 (* Another thread has blocked and we have to release it. We can safely 480 set the value to 1 here to release the lock. If another thread 481 acquires it before we have woken up the other threads that's fine. 482 Equally, if another thread decremented the count and saw it was 483 still locked it will enter the RTS and try to acquire the lock 484 there. 485 It's probably better to reset it here rather than within the RTS 486 since it allows another thread to acquire the lock immediately 487 rather than after the rather long process of entering the RTS. 488 Resetting this needs to be atomic with respect to atomic increment 489 and decrement. That's not a problem on X86 so a simple assignment 490 is sufficient but in the interpreter at least it's necessary to 491 acquire a lock. *) 492 ( 493 atomicReset m; 494 threadMutexUnlock m 495 ) 496 end 497 498 (* Try to lock the mutex. If it was previously unlocked then lock it and 499 return true otherwise return false. Because we don't block here there is 500 the possibility that the thread that has locked it could release the lock 501 shortly afterwards. The check for !m = 0w1 is an optimisation and nearly 502 all the time it avoids the call to atomicDecr setting m to a negative value. 503 There is a small chance that another thread could lock the mutex between the 504 test for !m = 0w1 and the atomicDecr. In that case the atomicDecr would 505 return a negative value and the function that locked the mutex will have to 506 call into the RTS to reset it when it is unlocked. *) 507 fun trylock (m: mutex): bool = 508 if !m = 0w1 andalso atomicDecr m = 0w0 509 then true (* We've acquired the lock. *) 510 else false (* The lock was taken. *) 511 end 512 513 structure ConditionVar = 514 struct 515 open Thread 516 517 (* A condition variable contains a lock and a list of suspended threads. *) 518 type conditionVar = { lock: Mutex.mutex, threads: thread list ref } 519 fun conditionVar(): conditionVar = 520 { lock = Mutex.mutex(), threads = nvref nil } 521 522 local 523 val threadCondVarWait: Mutex.mutex -> unit = RunCall.rtsCallFull1 "PolyThreadCondVarWait" 524 and threadCondVarWaitUntil: Mutex.mutex * Time.time -> unit = RunCall.rtsCallFull2 "PolyThreadCondVarWaitUntil" 525 in 526 fun innerWait({lock, threads}: conditionVar, m: Mutex.mutex, t: Time.time option) : bool = 527 let 528 val me = self() (* My thread id. *) 529 530 fun waitAgain() = 531 let 532 fun doFind [] = false | doFind(h::t) = equal(h, me) orelse doFind t 533 534 fun removeThis [] = raise Fail "Thread missing in list" 535 | removeThis (h::t) = if equal(h, me) then t else h :: removeThis t 536 537 val () = 538 case t of 539 SOME time => threadCondVarWaitUntil(lock, time) 540 | NONE => threadCondVarWait lock 541 542 val () = Mutex.lock lock (* Get the lock again. *) 543 544 (* Are we still on the list? If so we haven't been explicitly woken 545 up. We've either timed out, been interrupted or simply returned 546 because the RTS needed to process some asynchronous results. *) 547 val stillThere = doFind(!threads) 548 open Time (* For >= *) 549 in 550 if not stillThere 551 then (* We're done. *) 552 ( 553 Mutex.unlock lock; 554 true 555 ) 556 else if (case t of NONE => false | SOME t => Time.now() >= t) 557 then (* We've timed out. *) 558 ( 559 threads := removeThis(! threads); 560 Mutex.unlock lock; 561 false 562 ) 563 else 564 ( 565 (* See if we've been interrupted. If so remove ourselves 566 and exit. *) 567 testInterrupt() 568 handle exn => (threads := removeThis(! threads); Mutex.unlock lock; raise exn); 569 (* Otherwise just keep waiting. *) 570 waitAgain() 571 ) 572 end 573 in 574 Mutex.lock lock; (* Lock the internal mutex. *) 575 Mutex.unlock m; (* Unlock the external mutex *) 576 threads := me :: !threads; (* Add ourselves to the list. *) 577 waitAgain() (* Wait and return the result when we're done. *) 578 end 579 580 fun doWait(c: conditionVar, m: Mutex.mutex, t: Time.time option) : bool = 581 let 582 val originalIntstate = getInterruptState() 583 (* Set this to handle interrupts synchronously unless we're already 584 ignoring them. *) 585 val () = 586 if originalIntstate = InterruptDefer 587 then () 588 else setInterruptState InterruptSynch; 589 590 (* Wait for the condition. If it raises an exception we still 591 need to reacquire the lock unless we were handling interrupts 592 asynchronously. *) 593 val result = 594 innerWait(c, m, t) handle exn => 595 ( 596 (* We had an exception. If we were handling exceptions synchronously 597 we reacquire the lock. If it was set to InterruptAsynchOnce this 598 counts as a single asynchronous exception and we restore the 599 state as InterruptSynch. *) 600 case originalIntstate of 601 InterruptDefer => (* Shouldn't happen? *) Mutex.lock m 602 | InterruptSynch => Mutex.lock m 603 | InterruptAsynch => setInterruptState InterruptAsynch 604 | InterruptAsynchOnce => setInterruptState InterruptSynch; 605 606 raise exn (* Reraise the exception*) 607 ) 608 in 609 (* Restore the original interrupt state first. *) 610 setInterruptState originalIntstate; 611 (* Normal return. Reacquire the lock before returning. *) 612 Mutex.lock m; 613 result 614 end 615 616 fun wait(c: conditionVar, m: Mutex.mutex) : unit = 617 (doWait(c, m, NONE); ()) 618 and waitUntil(c: conditionVar, m: Mutex.mutex, t: Time.time) : bool = 619 doWait(c, m, SOME t) 620 end 621 622 local 623 (* This call wakes up the specified thread. If the thread has already been 624 interrupted and is not ignoring interrupts it returns false. Otherwise 625 it wakes up the thread and returns true. We have to use this because 626 we define that if a thread is interrupted before it is signalled then 627 it raises Interrupt. *) 628 val threadCondVarWake: thread -> bool = RunCall.rtsCallFast1 "PolyThreadCondVarWake" 629 630 (* Wake a single thread if we can (signal). *) 631 fun wakeOne [] = [] 632 | wakeOne (thread::rest) = 633 if threadCondVarWake thread 634 then rest 635 else thread :: wakeOne rest 636 (* Wake all threads (broadcast). *) 637 fun wakeAll [] = [] (* Always returns the empty list. *) 638 | wakeAll (thread::rest) = (threadCondVarWake thread; wakeAll rest) 639 640 fun signalOrBroadcast({lock, threads}: conditionVar, wakeThreads) : unit = 641 let 642 val originalState = getInterruptState() 643 in 644 (* Set this to handle interrupts synchronously unless we're already 645 ignoring them. We need to do this to avoid an asynchronous 646 interrupt which could leave the internal lock in an inconsistent state. *) 647 if originalState = InterruptDefer 648 then () 649 else setInterruptState InterruptSynch; 650 (* Get the condition var lock. *) 651 Mutex.lock lock; 652 threads := wakeThreads(! threads); 653 Mutex.unlock lock; 654 setInterruptState originalState; (* Restore original state. *) 655 (* Test if we were interrupted while we were handling 656 interrupts synchronously. *) 657 if originalState = InterruptAsynch orelse originalState = InterruptAsynchOnce 658 then testInterrupt() 659 else () 660 end 661 in 662 fun signal cv = signalOrBroadcast(cv, wakeOne) 663 and broadcast cv = signalOrBroadcast(cv, wakeAll) 664 end 665 end 666end; 667 668structure ThreadLib: 669sig 670 val protect: Thread.Mutex.mutex -> ('a -> 'b) -> 'a -> 'b 671end = 672struct 673 (* This applies a function while a mutex is being held. 674 Although this can be defined in terms of Thread.Thread.getAttributes it's 675 defined here using the underlying calls. The original version with 676 getAttributes appeared as a major allocation hot-spot when building the 677 compiler because "protect" is called round every access to the global 678 name-space. *) 679 fun protect m f a = 680 let 681 open Thread.Thread Thread.Mutex 682 open Word 683 (* Set this to handle interrupts synchronously except if we are blocking 684 them. We don't want to get an asynchronous interrupt while we are 685 actually locking or unlocking the mutex but if we have to block to do 686 IO then we should allow an interrupt at that point. *) 687 val oldAttrs: Word.word = RunCall.loadWord(self(), 0w1) 688 val () = 689 if andb(oldAttrs, 0w6) = 0w0 (* Already deferred? *) 690 then () 691 else RunCall.storeWord (self(), 0w1, 692 orb(andb(notb 0w6, oldAttrs), 0w2)) 693 fun restoreAttrs() = 694 ( 695 RunCall.storeWord (self(), 0w1, oldAttrs); 696 if andb(oldAttrs, 0w4) = 0w4 then testInterrupt() else () 697 ) 698 val () = lock m 699 val result = f a 700 handle exn => 701 ( 702 unlock m; restoreAttrs(); 703 (* Reraise the exception preserving the location information. *) 704 PolyML.Exception.reraise exn 705 ) 706 in 707 unlock m; 708 restoreAttrs(); 709 result 710 end 711end; 712 713 714local 715 fun prettyMutex _ _ (_: Thread.Mutex.mutex) = PolyML.PrettyString "?" 716 and prettyThread _ _ (_: Thread.Thread.thread) = PolyML.PrettyString "?" 717 and prettyCondVar _ _ (_: Thread.ConditionVar.conditionVar) = PolyML.PrettyString "?" 718in 719 val () = PolyML.addPrettyPrinter prettyMutex 720 and () = PolyML.addPrettyPrinter prettyThread 721 and () = PolyML.addPrettyPrinter prettyCondVar 722end; 723 724 725