1(* 2 Title: Thread package for ML. 3 Author: David C. J. Matthews 4 Copyright (c) 2007-2014, 2018-20 5 6 This library is free software; you can redistribute it and/or 7 modify it under the terms of the GNU Lesser General Public 8 License version 2.1 as published by the Free Software Foundation. 9 10 This library is distributed in the hope that it will be useful, 11 but WITHOUT ANY WARRANTY; without even the implied warranty of 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 Lesser General Public License for more details. 14 15 You should have received a copy of the GNU Lesser General Public 16 License along with this library; if not, write to the Free Software 17 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA 18*) 19 20(* This signature and structure are not part of the standard basis library 21 but are included here because they depend on the Time structure and are 22 in turn dependencies of the BasicIO structure. *) 23 24(*!Earlier versions of Poly/ML have provided a form of concurrent execution through 25 the Process structure. Version 5.1 introduces 26 new thread primitives in the Thread structure. This structure is modelled on 27 the Posix thread (pthread) package but simplified and modified for ML. The aim 28 is to provide an efficient implementation of parallelism particularly to enable 29 ML programs to make use of multi-core processors while minimising the changes 30 needed to existing code. The Process structure will continue to be available 31 as a library written on top of these primitives but new programs should use 32 the Thread structure directly. 33 34The thread package differs from pthreads in a number of ways. 35There is no join function to wait for the completion of a thread. 36This can be written using mutexes and condition variables. 37Cancellation and signal handling are combined into the interrupt 38functions. (The Poly/ML Signal structure handles signals for all the 39threads together). The effect of explicit cancellation is achieved 40using the interrupt function. This causes an interrupt to be 41generated in a specific thread. Alternatively an interrupt can be 42broadcast to all threads. This is most likely to be used 43interactively to kill threads that appear to have gone out of 44control. The normal top-level handler for a console interrupt will 45generate this. Threads can choose how or whether they respond to 46these interrupts. A thread that is doing processor-intensive work 47probably needs to be able to be interrupted asynchronously whereas if 48it is communicating with other threads the presence of asynchronous 49interrupts makes correct programming difficult. 50*) 51 52signature THREAD = 53sig 54 (*!The Thread exception can be raised by various of the functions in the 55 structure if they detect an error.*) 56 exception Thread of string (* Raised if an operation fails. *) 57 58 structure Thread: 59 sig 60 (*!The type of a thread identifier.*) 61 eqtype thread 62 63 (* Thread attributes - This may be extended. *) 64 (*!The type of a thread attribute. Thread attributes are 65 properties of the thread that are set initially when the thread is 66 created but can subsequently be modified by the thread itself. The 67 thread attribute type may be extended in the future to include things 68 like scheduling priority. The current thread attributes control the 69 way interrupt exceptions are delivered to the thread. 70 71 `EnableBroadcastInterrupt` controls whether the thread will receive an interrupt sent using 72 `broadcastInterrupt` or as a result of pressing the console interrupt 73 key. If this is false the thread will not receive them. The default 74 for a new thread if this is not specified is false. 75 76 `InterruptState` controls when and whether interrupts are delivered to the 77 thread. This includes broadcast interrupts and also interrupts directed at 78 a specific thread with the interrupt call. 79 `InterruptDefer` means the thread 80 will not receive any interrupts. However, if the thread has previously been 81 interrupted the interrupt may be delivered when the thread calls setAttributes 82 to change its interrupt state. `InterruptSynch` 83 means interrupts are delivered 84 synchronously. An interrupt will be delayed until an interruption point. An 85 interruption point is one of: `testInterrupt`, 86 `ConditionVar.wait`, `ConditionVar.waitUntil` 87 and various library calls that may block, such as IO calls, pause etc. N.B. 88 `Mutex.lock` is not an interruption point even though it can result in a thread 89 blocking for an indefinite period. `InterruptAsynch` means interrupts are delivered 90 asynchronously i.e. at a suitable point soon after they are triggered. 91 `InterruptAsynchOnce` 92 means that only a single interrupt is delivered asynchronously after which 93 the interrupt state is changed to `InterruptSynch`. It allows a thread to tidy 94 up and if necessary indicate that it has been interrupted without the risk 95 of a second asynchronous interrupt occurring in the handler for the first 96 interrupt. If this attribute is not specified when a thread is created the 97 default is `InterruptSynch`. 98 99 `MaximumMLStack` was added in version 5.5.3. It controls the maximum size the 100 ML stack may grow to. It is an option type where NONE allows the stack to 101 grow to the limit of the available memory whereas SOME n limits the stack 102 to n words. This is approximate since there is some rounding involved. When 103 the limit is reached the thread is sent an Interrupt exception.*) 104 datatype threadAttribute = 105 (* Does this thread accept a broadcast interrupt? The default is not to 106 accept broadcast interrupts. *) 107 EnableBroadcastInterrupt of bool 108 (* How to handle interrupts. The default is to handle interrupts synchronously. *) 109 | InterruptState of interruptState 110 (* Maximum size of the ML stack in words. NONE means unlimited *) 111 | MaximumMLStack of int option 112 113 and interruptState = 114 InterruptDefer (* Defer any interrupts. *) 115 | InterruptSynch (* Interrupts are delivered synchronously. An interrupt 116 will be delayed until an interruption point. An interruption point 117 is one of: testInterrupt, ConditionVar.wait, ConditionVar.waitUntil 118 and various library calls that may block, such as IO calls, pause etc. 119 N.B. Mutex.lock is not an interruption point even though it can result 120 in a thread blocking for an indefinite period. *) 121 | InterruptAsynch (* Interrupts are delivered asynchronously i.e. at a suitable 122 point soon after they are triggered. *) 123 | InterruptAsynchOnce (* As InterruptAsynch except that only a single interrupt 124 is delivered asynchronously after which the interrupt state is changed to 125 InterruptSynch. It allows a thread to tidy up and if necessary indicate 126 that it has been interrupted without the risk of a second asynchronous 127 interrupt occurring in the handler for the first interrupt. *) 128 129 (*!Fork a thread. Starts a new thread running 130 the function argument. The attribute list gives initial values for thread attributes 131 which can be modified by the thread itself. Any unspecified attributes take 132 default values. The thread is terminated when the thread function returns, if 133 it raises an uncaught exception or if it calls `exit`;*) 134 val fork: (unit->unit) * threadAttribute list -> thread 135 136 (*!Terminate this thread. *) 137 val exit: unit -> unit 138 (*!Test if a thread is still running or has terminated. This function should be 139 used with care. The thread may be on the point of terminating and still appear 140 to be active.*) 141 val isActive: thread -> bool 142 143 (*!Test whether thread ids are the same. This is provided for backwards compatibility 144 since `thread` is an eqtype. *) 145 val equal: thread * thread -> bool 146 (*!Return the thread identifier for the current thread. *) 147 val self: unit -> thread 148 149 exception Interrupt (* = SML90.Interrupt *) 150 (*!Send an Interrupt exception to a specific thread. When and indeed whether 151 the exception is actually delivered will depend on the interrupt state 152 of the target thread. Raises Thread if the thread is no longer running, 153 so an exception handler should be used unless the thread is known to 154 be blocked. *) 155 val interrupt: thread -> unit 156 (*!Send an interrupt exception to every thread which is set to accept it. *) 157 val broadcastInterrupt: unit -> unit 158 (*!If this thread is handling interrupts synchronously, test to see 159 if it has been interrupted. If so it raises the 160 `Interrupt` exception. *) 161 val testInterrupt: unit -> unit 162 (*!Terminate a thread. This should be used as a last resort. Normally 163 a thread should be allowed to clean up and terminate by using the 164 interrupt call. Raises Thread if the thread is no longer running, 165 so an exception handler should be used unless the thread is known to 166 be blocked. *) 167 val kill: thread -> unit 168 169 (*!Get and set thread-local store for the calling thread. The store is a 170 tagged associative memory which is initially empty for a new thread. 171 A thread can call setLocal to add or replace items in its store and 172 call getLocal to return values if they exist. The Universal structure 173 contains functions to make new tags as well as injection, projection and 174 test functions. *) 175 val getLocal: 'a Universal.tag -> 'a option 176 and setLocal: 'a Universal.tag * 'a -> unit 177 178 (*!Change the specified attribute(s) for the calling thread. Unspecified 179 attributes remain unchanged. *) 180 val setAttributes: threadAttribute list -> unit 181 (*!Get the values of attributes. *) 182 val getAttributes: unit -> threadAttribute list 183 184 (*!Return the number of processors that will be used to run threads 185 and the number of physical processors if that is available. *) 186 val numProcessors: unit -> int 187 and numPhysicalProcessors: unit -> int option 188 end 189 190 structure Mutex: 191 sig 192 (*!A mutex provides simple mutual exclusion. A thread can lock 193 a mutex and until it unlocks it no other thread will be able to lock it. 194 Locking and unlocking are intended to be fast in the situation when 195 there is no other process attempting to lock the mutex. 196 These functions may not work correctly if an asynchronous interrupt 197 is delivered during the calls. A thread should use synchronous interrupt 198 when using these calls. *) 199 type mutex 200 (*!Make a new mutex *) 201 val mutex: unit -> mutex 202 (*!Lock a mutex. If the mutex is currently locked the thread is 203 blocked until it is unlocked. If a thread tries to lock a mutex that 204 it has previously locked the thread will deadlock. 205 N.B. `thread` is not an interruption point 206 (a point where synchronous 207 interrupts are delivered) even though a thread can be blocked indefinitely. *) 208 val lock: mutex -> unit 209 (*!Unlock a mutex and allow any waiting threads to run. The behaviour 210 if the mutex was not previously locked by the calling thread is undefined. *) 211 val unlock: mutex -> unit 212 (*!Attempt to lock the mutex. Returns true if the mutex was not 213 previously locked and has now been locked by the calling thread. Returns 214 false if the mutex was previously locked, including by the calling thread. *) 215 val trylock: mutex -> bool 216 217 end 218 219 structure ConditionVar: 220 sig 221 (*!Condition variables are used to provide communication 222 between threads. A condition variable is used in conjunction with a mutex 223 and usually a reference to establish and test changes in state. The normal 224 use is for one thread to lock a mutex, test the reference and then wait on 225 the condition variable, releasing the lock on the mutex while it does so. 226 Another thread may then lock the mutex, update the reference, unlock the 227 mutex, and signal the condition variable. This wakes up the first thread 228 and reacquires the lock allowing the thread to test the updated reference 229 with the lock held. 230 More complex communication mechanisms, such as blocking channels, can 231 be written in terms of condition variables. *) 232 type conditionVar 233 (*!Make a new condition variable. *) 234 val conditionVar: unit -> conditionVar 235 (*!Release the mutex and block until the condition variable is signalled. When 236 wait returns the mutex will have been re-acquired. 237 238 If the thread is handling interrupts synchronously this function can be interrupted 239 using the `Thread.interrupt` function or, if the thread is set to 240 accept broadcast interrupts, `Thread.broadcastInterrupt`. The thread 241 will re-acquire the mutex before the exception is delivered. An exception 242 will only be delivered in this case if the interrupt is sent before the condition 243 variable is signalled. If the interrupt is sent after the condition variable 244 is signalled the function will return normally even if it has not yet re-acquired 245 the mutex. The interrupt state will be delivered on the next call to "wait", 246 `Thread.testInterrupt` or other blocking call. 247 248 A thread should never call this function if it may receive an asynchronous 249 interrupt. It should always set its interrupt state to either 250 `InterruptSynch` 251 or `InterruptDefer` beforehand. 252 An asynchronous interrupt may leave the condition 253 variable and the mutex in an indeterminate state and could lead to deadlock. 254 255 A condition variable should only be associated with one mutex at a time. 256 All the threads waiting on a condition variable should pass the same mutex 257 as argument.*) 258 val wait: conditionVar * Mutex.mutex -> unit 259 (*!As wait except that it blocks until either the condition 260 variable is signalled or the time (absolute) is reached. Either way 261 the mutex is reacquired so there may be a further delay if it is held 262 by another thread. *) 263 val waitUntil: conditionVar * Mutex.mutex * Time.time -> bool 264 (*!Wake up one thread if any are waiting on the condition variable. 265 If there are several threads waiting for the condition variable one will be 266 selected to run and will run as soon as it has re-acquired the lock.*) 267 val signal: conditionVar -> unit 268 (*!Wake up all threads waiting on the condition variable. *) 269 val broadcast: conditionVar -> unit 270 end 271 272end; 273 274structure Thread :> THREAD = 275struct 276 exception Thread = RunCall.Thread 277 278 structure Thread = 279 struct 280 open Thread (* Created in INITIALISE with thread type and self function. *) 281 282 (* Equality is pointer equality. *) 283 val equal : thread*thread->bool = op = 284 285 datatype threadAttribute = 286 EnableBroadcastInterrupt of bool 287 | InterruptState of interruptState 288 | MaximumMLStack of int option 289 290 and interruptState = 291 InterruptDefer 292 | InterruptSynch 293 | InterruptAsynch 294 | InterruptAsynchOnce 295 296 (* Convert attributes to bits and a mask. *) 297 fun attrsToWord (at: threadAttribute list): Word.word * Word.word = 298 let 299 (* Check that a particular attribute appears only once. 300 As well as accumulating the actual bits in the result we 301 also accumulate the mask of bits. If any of these 302 reappear we raise an exception. *) 303 fun checkRepeat(r, acc, set, mask) = 304 if Word.andb(set, mask) <> 0w0 305 then raise Thread "The same attribute appears more than once in the list" 306 else convert(r, acc, Word.orb(set, mask)) 307 308 and convert([], acc, set) = (acc, set) 309 | convert(EnableBroadcastInterrupt true :: r, acc, set) = 310 checkRepeat(r, Word.orb(acc, 0w1), set, 0w1) 311 | convert(EnableBroadcastInterrupt false :: r, acc, set) = 312 checkRepeat(r, acc (* No bit *), set, 0w1) 313 | convert(InterruptState s :: r, acc, set) = 314 checkRepeat(r, Word.orb(setIstateBits s, acc), set, 0w6) 315 | convert(MaximumMLStack _ :: r, acc, set) = 316 convert(r, acc, set) 317 in 318 convert(at, 0w0, 0w0) 319 end 320 321 and setIstateBits InterruptDefer = 0w0 322 | setIstateBits InterruptSynch = 0w2 323 | setIstateBits InterruptAsynch = 0w4 324 | setIstateBits InterruptAsynchOnce = 0w6 325 326 fun getIstateBits(w: Word.word): interruptState = 327 let 328 val ibits = Word.andb(w, 0w6) 329 in 330 if ibits = 0w0 331 then InterruptDefer 332 else if ibits = 0w2 333 then InterruptSynch 334 else if ibits = 0w4 335 then InterruptAsynch 336 else InterruptAsynchOnce 337 end 338 339 fun wordToAttrs w = 340 let 341 (* Enable broadcast - true if bottom bit is set. *) 342 val bcast = EnableBroadcastInterrupt(Word.andb(w, 0w1) = 0w1) 343 in 344 [bcast, InterruptState(getIstateBits w)] 345 end 346 347 exception Interrupt = RunCall.Interrupt 348 349 (* The thread id is opaque outside this structure but is actually a six 350 word mutable object. 351 Word 0: Index into thread table (used inside the RTS only) 352 Word 1: Flags: initialised by the RTS and set by this code 353 Word 2: Thread local store: read and set by this code. 354 Word 3: IntRequest: Set by the RTS if there is an interrupt pending 355 Word 4: Maximum ML stack size. Unlimited is stored here as zero 356 *) 357 val threadIdFlags = 0w1 358 and threadIdThreadLocal = 0w2 359 and threadIdIntRequest = 0w3 360 and threadIdStackSize = 0w4 361 362 fun getLocal (t: 'a Universal.tag) : 'a option = 363 let 364 val root: Universal.universal ref list = 365 RunCall.loadWord(self(), threadIdThreadLocal) 366 367 fun doFind [] = NONE 368 | doFind ((ref v)::r) = 369 if Universal.tagIs t v 370 then SOME(Universal.tagProject t v) 371 else doFind r 372 in 373 doFind root 374 end 375 376 fun setLocal (t: 'a Universal.tag, newVal: 'a) : unit = 377 let 378 (* See if we already have this in the list. *) 379 val root: Universal.universal ref list = 380 RunCall.loadWord(self(), threadIdThreadLocal) 381 382 fun doFind [] = 383 (* Not in the list - Add it. *) 384 RunCall.storeWord 385 (self(), threadIdThreadLocal, 386 ref (Universal.tagInject t newVal) :: root) 387 | doFind (v::r) = 388 if Universal.tagIs t (!v) 389 (* If it's in the list update it. *) 390 then v := Universal.tagInject t newVal 391 else doFind r 392 393 in 394 doFind root 395 end 396 397 local 398 val threadTestInterrupt: unit -> unit = RunCall.rtsCallFull0 "PolyThreadTestInterrupt" 399 in 400 fun testInterrupt() = 401 (* If there is a pending request the word in the thread object 402 will be non-zero. *) 403 if RunCall.loadWord(self(), threadIdIntRequest) <> 0 404 then threadTestInterrupt() 405 else () 406 end 407 408 val exit: unit -> unit = RunCall.rtsCallFull0 "PolyThreadKillSelf" 409 and isActive: thread -> bool = RunCall.rtsCallFast1 "PolyThreadIsActive" 410 and broadcastInterrupt: unit -> unit = RunCall.rtsCallFull0 "PolyThreadBroadcastInterrupt" 411 412 local 413 fun getAttrWord (me: thread) : Word.word = 414 RunCall.loadWord(me, threadIdFlags) 415 416 fun getStackSizeAsInt (me: thread) : int = 417 RunCall.loadWord(me, threadIdStackSize) 418 419 and getStackSize me : int option = 420 case getStackSizeAsInt me of 421 0 => NONE 422 | s => SOME s 423 424 fun newStackSize ([], default) = default 425 | newStackSize (MaximumMLStack NONE :: _, _) = 0 426 | newStackSize (MaximumMLStack (SOME n) :: _, _) = 427 if n <= 0 then raise Thread "The stack size must be greater than zero" else n 428 | newStackSize (_ :: l, default) = newStackSize (l, default) 429 430 val threadMaxStackSize: int -> unit = RunCall.rtsCallFull1 "PolyThreadMaxStackSize" 431 in 432 (* Set attributes. Only changes the values that are specified. The 433 others remain the same. *) 434 fun setAttributes (attrs: threadAttribute list) : unit = 435 let 436 val me = self() 437 val oldValues: Word.word = getAttrWord me 438 val (newValue, mask) = attrsToWord attrs 439 val stack = newStackSize(attrs, getStackSizeAsInt me) 440 in 441 RunCall.storeWord (self(), threadIdFlags, 442 Word.orb(newValue, Word.andb(Word.notb mask, oldValues))); 443 if stack = getStackSizeAsInt me 444 then () else threadMaxStackSize stack; 445 (* If we are now handling interrupts asynchronously check whether 446 we have a pending interrupt now. This will only be effective 447 if we were previously handling them synchronously or blocking 448 them. *) 449 if Word.andb(newValue, 0w4) = 0w4 450 then testInterrupt() 451 else () 452 end 453 454 fun getAttributes() : threadAttribute list = 455 let 456 val me = self() 457 in 458 MaximumMLStack (getStackSize me) :: wordToAttrs(getAttrWord me) 459 end 460 461 (* These are used in the ConditionVar structure. They affect only the 462 interrupt handling bits. *) 463 fun getInterruptState(): interruptState = getIstateBits(getAttrWord(self())) 464 and setInterruptState(s: interruptState): unit = 465 RunCall.storeWord (self(), threadIdFlags, 466 Word.orb(setIstateBits s, Word.andb(Word.notb 0w6, getAttrWord(self())))) 467 468 local 469 (* The default for a new thread is to ignore broadcasts and handle explicit 470 interrupts synchronously. *) 471 val (defaultAttrs, _) = 472 attrsToWord[EnableBroadcastInterrupt false, InterruptState InterruptSynch] 473 val threadForkFunction: 474 (unit->unit) * word * int -> thread = RunCall.rtsCallFull3 "PolyThreadForkThread" 475 in 476 fun fork(f:unit->unit, attrs: threadAttribute list): thread = 477 let 478 (* Any attributes specified explicitly override the defaults. *) 479 val (attrWord, mask) = attrsToWord attrs 480 val attrValue = Word.orb(attrWord, Word.andb(Word.notb mask, defaultAttrs)) 481 val stack = newStackSize(attrs, 0 (* Default is unlimited *)) 482 (* Run the function and exit whether it returns normally or raises an exception. *) 483 fun threadFunction () = (f() handle _ => ()) before exit() 484 in 485 threadForkFunction(threadFunction, attrValue, stack) 486 end 487 end 488 end 489 490 local 491 (* Send an interrupt to a thread. If it returns false 492 the thread did not exist and this should raise an exception. *) 493 val threadSendInterrupt: thread -> bool = RunCall.rtsCallFast1 "PolyThreadInterruptThread" 494 in 495 fun interrupt(t: thread) = 496 if threadSendInterrupt t 497 then () 498 else raise Thread "Thread does not exist" 499 end 500 501 local 502 val threadKillThread: thread -> bool = RunCall.rtsCallFast1 "PolyThreadKillThread" 503 in 504 fun kill(t: thread) = 505 if threadKillThread t 506 then () 507 else raise Thread "Thread does not exist" 508 end 509 510 val numProcessors: unit -> int = RunCall.rtsCallFast0 "PolyThreadNumProcessors" 511 512 local 513 val numberOfPhysical: unit -> int = 514 RunCall.rtsCallFast0 "PolyThreadNumPhysicalProcessors" 515 in 516 fun numPhysicalProcessors(): int option = 517 (* It is not always possible to get this information *) 518 case numberOfPhysical() of 0 => NONE | n => SOME n 519 end 520 end 521 522 structure Mutex = 523 struct 524 type mutex = Word.word ref 525 val mutex = LibrarySupport.volatileWordRef (* Initially 0=unlocked. *) 526 open Thread (* atomicIncr, atomicDecr and atomicReset are set up by Initialise. *) 527 528 val threadMutexBlock: mutex -> unit = RunCall.rtsCallFull1 "PolyThreadMutexBlock" 529 val threadMutexUnlock: mutex -> unit = RunCall.rtsCallFull1 "PolyThreadMutexUnlock" 530 531 (* A mutex is implemented as a Word.word ref. It is initially set to 0 and locked 532 by atomically incrementing it. If it was previously unlocked the result will 533 by one but if it was already locked it will be some positive value. When it 534 is unlocked it is atomically decremented. If there was no contention the result 535 will again be 0 but if some other thread tried to lock it the result will be 536 one or positive. In that case the unlocking thread needs to call in to the 537 RTS to wake up the blocked thread. 538 539 The cost of contention on the lock is very high. To try to avoid this we 540 first loop (spin) to see if we can get the lock without contention. *) 541 542 val spin_cycle = 20000 543 fun spin (m: mutex, c: int) = 544 if ! m = 0w0 then () 545 else if c = spin_cycle then () 546 else spin(m, c+1); 547 548 fun lock (m: mutex): unit = 549 let 550 val () = spin(m, 0) 551 val newValue = atomicIncr m 552 in 553 if newValue = 0w1 554 then () (* We've acquired the lock. *) 555 else (* It's locked. We return when we have the lock. *) 556 ( 557 threadMutexBlock m; 558 lock m (* Try again. *) 559 ) 560 end 561 562 fun unlock (m: mutex): unit = 563 let 564 val newValue = atomicDecr m 565 in 566 if newValue = 0w0 567 then () (* No contention. *) 568 else 569 (* Another thread has blocked and we have to release it. We can safely 570 set the value to 0 here to release the lock. If another thread 571 acquires it before we have woken up the other threads that's fine. 572 Equally, if another thread incremented the count and saw it was 573 still locked it will enter the RTS and try to acquire the lock 574 there. 575 It's probably better to reset it here rather than within the RTS 576 since it allows another thread to acquire the lock immediately 577 rather than after the rather long process of entering the RTS. 578 Resetting this needs to be atomic with respect to atomic increment 579 and decrement. That's not a problem on X86 so a simple assignment 580 is sufficient but in the interpreter at least it's necessary to 581 acquire a lock. *) 582 ( 583 atomicReset m; 584 threadMutexUnlock m 585 ) 586 end 587 588 (* Try to lock the mutex. If it was previously unlocked then lock it and 589 return true otherwise return false. Because we don't block here there is 590 the possibility that the thread that has locked it could release the lock 591 shortly afterwards. The check for !m = 0w0 is an optimisation and nearly 592 all the time it avoids the call to atomicIncr setting m to a value > 1. 593 There is a small chance that another thread could lock the mutex between the 594 test for !m = 0w0 and the atomicIncr. In that case the atomicIncr would 595 return a value > 1 and the function that locked the mutex will have to 596 call into the RTS to reset it when it is unlocked. *) 597 fun trylock (m: mutex): bool = 598 if !m = 0w0 andalso atomicIncr m = 0w1 599 then true (* We've acquired the lock. *) 600 else false (* The lock was taken. *) 601 end 602 603 structure ConditionVar = 604 struct 605 open Thread 606 607 (* A condition variable contains a lock and a list of suspended threads. *) 608 type conditionVar = { lock: Mutex.mutex, threads: thread list ref } 609 fun conditionVar(): conditionVar = 610 { lock = Mutex.mutex(), threads = LibrarySupport.volatileListRef() } 611 612 local 613 val threadCondVarWait: Mutex.mutex -> unit = RunCall.rtsCallFull1 "PolyThreadCondVarWait" 614 and threadCondVarWaitUntil: Mutex.mutex * Time.time -> unit = RunCall.rtsCallFull2 "PolyThreadCondVarWaitUntil" 615 in 616 fun innerWait({lock, threads}: conditionVar, m: Mutex.mutex, t: Time.time option) : bool = 617 let 618 val me = self() (* My thread id. *) 619 620 fun waitAgain() = 621 let 622 fun doFind [] = false | doFind(h::t) = equal(h, me) orelse doFind t 623 624 fun removeThis [] = raise Fail "Thread missing in list" 625 | removeThis (h::t) = if equal(h, me) then t else h :: removeThis t 626 627 val () = 628 case t of 629 SOME time => threadCondVarWaitUntil(lock, time) 630 | NONE => threadCondVarWait lock 631 632 val () = Mutex.lock lock (* Get the lock again. *) 633 634 (* Are we still on the list? If so we haven't been explicitly woken 635 up. We've either timed out, been interrupted or simply returned 636 because the RTS needed to process some asynchronous results. *) 637 val stillThere = doFind(!threads) 638 open Time (* For >= *) 639 in 640 if not stillThere 641 then (* We're done. *) 642 ( 643 Mutex.unlock lock; 644 true 645 ) 646 else if (case t of NONE => false | SOME t => Time.now() >= t) 647 then (* We've timed out. *) 648 ( 649 threads := removeThis(! threads); 650 Mutex.unlock lock; 651 false 652 ) 653 else 654 ( 655 (* See if we've been interrupted. If so remove ourselves 656 and exit. *) 657 testInterrupt() 658 handle exn => (threads := removeThis(! threads); Mutex.unlock lock; raise exn); 659 (* Otherwise just keep waiting. *) 660 waitAgain() 661 ) 662 end 663 in 664 Mutex.lock lock; (* Lock the internal mutex. *) 665 Mutex.unlock m; (* Unlock the external mutex *) 666 threads := me :: !threads; (* Add ourselves to the list. *) 667 waitAgain() (* Wait and return the result when we're done. *) 668 end 669 670 fun doWait(c: conditionVar, m: Mutex.mutex, t: Time.time option) : bool = 671 let 672 val originalIntstate = getInterruptState() 673 (* Set this to handle interrupts synchronously unless we're already 674 ignoring them. *) 675 val () = 676 if originalIntstate = InterruptDefer 677 then () 678 else setInterruptState InterruptSynch; 679 680 (* Wait for the condition. If it raises an exception we still 681 need to reacquire the lock unless we were handling interrupts 682 asynchronously. *) 683 val result = 684 innerWait(c, m, t) handle exn => 685 ( 686 (* We had an exception. If we were handling exceptions synchronously 687 we reacquire the lock. If it was set to InterruptAsynchOnce this 688 counts as a single asynchronous exception and we restore the 689 state as InterruptSynch. *) 690 case originalIntstate of 691 InterruptDefer => (* Shouldn't happen? *) Mutex.lock m 692 | InterruptSynch => Mutex.lock m 693 | InterruptAsynch => setInterruptState InterruptAsynch 694 | InterruptAsynchOnce => setInterruptState InterruptSynch; 695 696 raise exn (* Reraise the exception*) 697 ) 698 in 699 (* Restore the original interrupt state first. *) 700 setInterruptState originalIntstate; 701 (* Normal return. Reacquire the lock before returning. *) 702 Mutex.lock m; 703 result 704 end 705 706 fun wait(c: conditionVar, m: Mutex.mutex) : unit = 707 (doWait(c, m, NONE); ()) 708 and waitUntil(c: conditionVar, m: Mutex.mutex, t: Time.time) : bool = 709 doWait(c, m, SOME t) 710 end 711 712 local 713 (* This call wakes up the specified thread. If the thread has already been 714 interrupted and is not ignoring interrupts it returns false. Otherwise 715 it wakes up the thread and returns true. We have to use this because 716 we define that if a thread is interrupted before it is signalled then 717 it raises Interrupt. *) 718 val threadCondVarWake: thread -> bool = RunCall.rtsCallFast1 "PolyThreadCondVarWake" 719 720 (* Wake a single thread if we can (signal). *) 721 fun wakeOne [] = [] 722 | wakeOne (thread::rest) = 723 if threadCondVarWake thread 724 then rest 725 else thread :: wakeOne rest 726 (* Wake all threads (broadcast). *) 727 fun wakeAll [] = [] (* Always returns the empty list. *) 728 | wakeAll (thread::rest) = (threadCondVarWake thread; wakeAll rest) 729 730 fun signalOrBroadcast({lock, threads}: conditionVar, wakeThreads) : unit = 731 let 732 val originalState = getInterruptState() 733 in 734 (* Set this to handle interrupts synchronously unless we're already 735 ignoring them. We need to do this to avoid an asynchronous 736 interrupt which could leave the internal lock in an inconsistent state. *) 737 if originalState = InterruptDefer 738 then () 739 else setInterruptState InterruptSynch; 740 (* Get the condition var lock. *) 741 Mutex.lock lock; 742 threads := wakeThreads(! threads); 743 Mutex.unlock lock; 744 setInterruptState originalState; (* Restore original state. *) 745 (* Test if we were interrupted while we were handling 746 interrupts synchronously. *) 747 if originalState = InterruptAsynch orelse originalState = InterruptAsynchOnce 748 then testInterrupt() 749 else () 750 end 751 in 752 fun signal cv = signalOrBroadcast(cv, wakeOne) 753 and broadcast cv = signalOrBroadcast(cv, wakeAll) 754 end 755 end 756end; 757 758local 759 fun prettyMutex _ _ (_: Thread.Mutex.mutex) = PolyML.PrettyString "?" 760 and prettyThread _ _ (_: Thread.Thread.thread) = PolyML.PrettyString "?" 761 and prettyCondVar _ _ (_: Thread.ConditionVar.conditionVar) = PolyML.PrettyString "?" 762in 763 val () = PolyML.addPrettyPrinter prettyMutex 764 and () = PolyML.addPrettyPrinter prettyThread 765 and () = PolyML.addPrettyPrinter prettyCondVar 766end; 767 768 769