1(*
2    Title:      Thread package for ML.
3    Author:     David C. J. Matthews
4    Copyright (c) 2007-2014, 2018-20
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License version 2.1 as published by the Free Software Foundation.
9    
10    This library is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    Lesser General Public License for more details.
14    
15    You should have received a copy of the GNU Lesser General Public
16    License along with this library; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18*)
19
20(* This signature and structure are not part of the standard basis library
21   but are included here because they depend on the Time structure and are
22   in turn dependencies of the BasicIO structure. *)
23
24(*!Earlier versions of Poly/ML have provided a form of concurrent execution through 
25  the Process structure. Version 5.1 introduces 
26  new thread primitives in the Thread structure. This structure is modelled on 
27  the Posix thread (pthread) package but simplified and modified for ML. The aim 
28  is to provide an efficient implementation of parallelism particularly to enable 
29  ML programs to make use of multi-core processors while minimising the changes 
30  needed to existing code. The Process structure will continue to be available 
31  as a library written on top of these primitives but new programs should use 
32  the Thread structure directly.
33  
34The thread package differs from pthreads in a number of ways. 
35There is no join function to wait for the completion of a thread. 
36This can be written using mutexes and condition variables. 
37Cancellation and signal handling are combined into the interrupt
38functions. (The Poly/ML Signal structure handles signals for all the
39threads together).  The effect of explicit cancellation is achieved
40using the interrupt function.  This causes an interrupt to be
41generated in a specific thread.  Alternatively an interrupt can be
42broadcast to all threads.  This is most likely to be used
43interactively to kill threads that appear to have gone out of
44control.  The normal top-level handler for a console interrupt will
45generate this.  Threads can choose how or whether they respond to
46these interrupts.  A thread that is doing processor-intensive work
47probably needs to be able to be interrupted asynchronously whereas if
48it is communicating with other threads the presence of asynchronous
49interrupts makes correct programming difficult.
50*)
51
52signature THREAD =
53sig
54    (*!The Thread exception can be raised by various of the functions in the
55       structure if they detect an error.*)
56    exception Thread of string (* Raised if an operation fails. *)
57    
58    structure Thread:
59    sig
60        (*!The type of a thread identifier.*)
61        eqtype thread
62        
63        (* Thread attributes - This may be extended. *)
64        (*!The type of a thread attribute. Thread attributes are
65            properties of the thread that are set initially when the thread is
66            created but can subsequently be modified by the thread itself.  The
67            thread attribute type may be extended in the future to include things
68            like scheduling priority. The current thread attributes control the
69            way interrupt exceptions are delivered to the thread.
70            
71            `EnableBroadcastInterrupt` controls whether the thread will receive an interrupt sent using
72            `broadcastInterrupt` or as a result of pressing the console interrupt
73            key. If this is false the thread will not receive them.  The default
74            for a new thread if this is not specified is false.
75            
76            `InterruptState` controls when and whether interrupts are delivered to the 
77            thread. This includes broadcast interrupts and also interrupts directed at 
78            a specific thread with the interrupt call.
79            `InterruptDefer` means the thread 
80            will not receive any interrupts. However, if the thread has previously been 
81            interrupted the interrupt may be delivered when the thread calls setAttributes 
82            to change its interrupt state. `InterruptSynch`
83            means interrupts are delivered 
84            synchronously. An interrupt will be delayed until an interruption point. An 
85            interruption point is one of: `testInterrupt`,
86            `ConditionVar.wait`, `ConditionVar.waitUntil`
87            and various library calls that may block, such as IO calls, pause etc. N.B. 
88            `Mutex.lock` is not an interruption point even though it can result in a thread 
89            blocking for an indefinite period. `InterruptAsynch` means interrupts are delivered 
90            asynchronously i.e. at a suitable point soon after they are triggered.
91            `InterruptAsynchOnce`
92            means that only a single interrupt is delivered asynchronously after which 
93            the interrupt state is changed to `InterruptSynch`. It allows a thread to tidy 
94            up and if necessary indicate that it has been interrupted without the risk 
95            of a second asynchronous interrupt occurring in the handler for the first 
96            interrupt. If this attribute is not specified when a thread is created the 
97            default is `InterruptSynch`.
98            
99            `MaximumMLStack` was added in version 5.5.3. It controls the maximum size the 
100            ML stack may grow to. It is an option type where NONE allows the stack to 
101            grow to the limit of the available memory whereas SOME n limits the stack 
102            to n words. This is approximate since there is some rounding involved. When 
103            the limit is reached the thread is sent an Interrupt exception.*)
104        datatype threadAttribute =
105            (* Does this thread accept a broadcast interrupt?  The default is not to
106               accept broadcast interrupts. *)
107            EnableBroadcastInterrupt of bool
108            (* How to handle interrupts.  The default is to handle interrupts synchronously.  *)
109        |   InterruptState of interruptState
110            (* Maximum size of the ML stack in words. NONE means unlimited *)
111        |   MaximumMLStack of int option
112        
113        and interruptState =
114            InterruptDefer (* Defer any interrupts. *)
115        |   InterruptSynch  (* Interrupts are delivered synchronously. An interrupt
116               will be delayed until an interruption point.  An interruption point
117               is one of: testInterrupt, ConditionVar.wait, ConditionVar.waitUntil
118               and various library calls that may block, such as IO calls, pause etc.
119               N.B.  Mutex.lock is not an interruption point even though it can result
120               in a thread blocking for an indefinite period.  *)
121        |   InterruptAsynch (* Interrupts are delivered asynchronously i.e. at a suitable
122               point soon after they are triggered. *)
123        |   InterruptAsynchOnce (* As InterruptAsynch except that only a single interrupt
124               is delivered asynchronously after which the interrupt state is changed to
125               InterruptSynch.  It allows a thread to tidy up and if necessary indicate
126               that it has been interrupted without the risk of a second asynchronous
127               interrupt occurring in the handler for the first interrupt. *)
128        
129        (*!Fork a thread. Starts a new thread running 
130          the function argument. The attribute list gives initial values for thread attributes 
131          which can be modified by the thread itself. Any unspecified attributes take 
132          default values. The thread is terminated when the thread function returns, if 
133          it raises an uncaught exception or if it calls `exit`;*)
134        val fork: (unit->unit) * threadAttribute list -> thread
135
136        (*!Terminate this thread. *)
137        val exit: unit -> unit
138        (*!Test if a thread is still running or has terminated.  This function should be
139          used with care.  The thread may be on the point of terminating and still appear
140          to be active.*)
141        val isActive: thread -> bool
142        
143        (*!Test whether thread ids are the same.  This is provided for backwards compatibility
144          since `thread` is an eqtype. *)
145        val equal: thread * thread -> bool
146        (*!Return the thread identifier for the current thread. *)
147        val self: unit -> thread
148        
149        exception Interrupt (* = SML90.Interrupt *)
150        (*!Send an Interrupt exception to a specific thread.  When and indeed whether
151           the exception is actually delivered will depend on the interrupt state
152           of the target thread.  Raises Thread if the thread is no longer running,
153           so an exception handler should be used unless the thread is known to
154           be blocked. *)
155        val interrupt: thread -> unit
156        (*!Send an interrupt exception to every thread which is set to accept it. *)
157        val broadcastInterrupt: unit -> unit
158        (*!If this thread is handling interrupts synchronously, test to see 
159           if it has been interrupted.  If so it raises the
160           `Interrupt` exception. *)
161        val testInterrupt: unit -> unit
162        (*!Terminate a thread. This should be used as a last resort.  Normally
163           a thread should be allowed to clean up and terminate by using the
164           interrupt call.  Raises Thread if the thread is no longer running,
165           so an exception handler should be used unless the thread is known to
166           be blocked. *)
167        val kill: thread -> unit
168        
169        (*!Get and set thread-local store for the calling thread. The store is a
170           tagged associative memory which is initially empty for a new thread.
171           A thread can call setLocal to add or replace items in its store and
172           call getLocal to return values if they exist.  The Universal structure
173           contains functions to make new tags as well as injection, projection and
174           test functions. *)
175        val getLocal: 'a Universal.tag -> 'a option
176        and setLocal: 'a Universal.tag * 'a -> unit
177        
178        (*!Change the specified attribute(s) for the calling thread.  Unspecified
179           attributes remain unchanged. *)
180        val setAttributes: threadAttribute list -> unit
181        (*!Get the values of attributes. *)
182        val getAttributes: unit -> threadAttribute list
183
184        (*!Return the number of processors that will be used to run threads
185           and the number of physical processors if that is available. *)
186        val numProcessors: unit -> int
187        and numPhysicalProcessors: unit -> int option
188    end
189        
190    structure Mutex:
191    sig
192        (*!A mutex provides simple mutual exclusion.  A thread can lock
193           a mutex and until it unlocks it no other thread will be able to lock it.
194           Locking and unlocking are intended to be fast in the situation when
195           there is no other process attempting to lock the mutex.
196           These functions may not work correctly if an asynchronous interrupt
197           is delivered during the calls.  A thread should use synchronous interrupt
198           when using these calls. *)
199        type mutex
200        (*!Make a new mutex *)
201        val mutex: unit -> mutex
202        (*!Lock a mutex.  If the mutex is currently locked the thread is
203           blocked until it is unlocked.  If a thread tries to lock a mutex that
204           it has previously locked the thread will deadlock.
205           N.B.  `thread` is not an interruption point
206           (a point where synchronous
207           interrupts are delivered) even though a thread can be blocked indefinitely. *)
208        val lock: mutex -> unit
209        (*!Unlock a mutex and allow any waiting threads to run.  The behaviour
210           if the mutex was not previously locked by the calling thread is undefined.  *)
211        val unlock: mutex -> unit
212        (*!Attempt to lock the mutex.  Returns true if the mutex was not
213           previously locked and has now been locked by the calling thread.  Returns
214           false if the mutex was previously locked, including by the calling thread. *)
215        val trylock: mutex -> bool
216        
217    end
218    
219    structure ConditionVar:
220    sig
221        (*!Condition variables are used to provide communication
222           between threads.  A condition variable is used in conjunction with a mutex
223           and usually a reference to establish and test changes in state.  The normal
224           use is for one thread to lock a mutex, test the reference and then wait on
225           the condition variable, releasing the lock on the mutex while it does so.
226           Another thread may then lock the mutex, update the reference, unlock the
227           mutex, and signal the condition variable.  This wakes up the first thread
228           and reacquires the lock allowing the thread to test the updated reference
229           with the lock held.
230           More complex communication mechanisms, such as blocking channels, can
231           be written in terms of condition variables. *)
232        type conditionVar
233        (*!Make a new condition variable. *)
234        val conditionVar: unit -> conditionVar
235        (*!Release the mutex and block until the condition variable is signalled. When 
236            wait returns the mutex will have been re-acquired.
237            
238            If the thread is handling interrupts synchronously this function can be interrupted 
239            using the `Thread.interrupt` function or, if the thread is set to 
240            accept broadcast interrupts, `Thread.broadcastInterrupt`. The thread 
241            will re-acquire the mutex before the exception is delivered. An exception 
242            will only be delivered in this case if the interrupt is sent before the condition 
243            variable is signalled. If the interrupt is sent after the condition variable 
244            is signalled the function will return normally even if it has not yet re-acquired 
245            the mutex. The interrupt state will be delivered on the next call to "wait", 
246            `Thread.testInterrupt` or other blocking call.
247            
248            A thread should never call this function if it may receive an asynchronous 
249            interrupt. It should always set its interrupt state to either
250            `InterruptSynch` 
251            or `InterruptDefer` beforehand.
252            An asynchronous interrupt may leave the condition 
253            variable and the mutex in an indeterminate state and could lead to deadlock.
254            
255            A condition variable should only be associated with one mutex at a time. 
256            All the threads waiting on a condition variable should pass the same mutex 
257            as argument.*)
258        val wait: conditionVar * Mutex.mutex -> unit
259        (*!As wait except that it blocks until either the condition
260           variable is signalled or the time (absolute) is reached.  Either way
261           the mutex is reacquired so there may be a further delay if it is held
262           by another thread.  *)
263        val waitUntil: conditionVar * Mutex.mutex * Time.time -> bool
264        (*!Wake up one thread if any are waiting on the condition variable. 
265          If there are several threads waiting for the condition variable one will be 
266          selected to run and will run as soon as it has re-acquired the lock.*)
267        val signal: conditionVar -> unit
268        (*!Wake up all threads waiting on the condition variable. *)
269        val broadcast: conditionVar -> unit
270    end
271
272end;
273
274structure Thread :> THREAD =
275struct
276    exception Thread = RunCall.Thread
277
278    structure Thread =
279    struct
280        open Thread (* Created in INITIALISE with thread type and self function. *)
281
282        (* Equality is pointer equality. *)
283        val equal : thread*thread->bool = op =
284
285        datatype threadAttribute =
286            EnableBroadcastInterrupt of bool
287        |   InterruptState of interruptState
288        |   MaximumMLStack of int option
289        
290        and interruptState =
291            InterruptDefer
292        |   InterruptSynch
293        |   InterruptAsynch
294        |   InterruptAsynchOnce 
295
296        (* Convert attributes to bits and a mask. *)
297        fun attrsToWord (at: threadAttribute list): Word.word * Word.word =
298        let
299            (* Check that a particular attribute appears only once.
300               As well as accumulating the actual bits in the result we
301               also accumulate the mask of bits.  If any of these
302               reappear we raise an exception. *)
303            fun checkRepeat(r, acc, set, mask) =
304                if Word.andb(set, mask) <> 0w0
305                then raise Thread "The same attribute appears more than once in the list"
306                else convert(r, acc,  Word.orb(set, mask))
307
308            and convert([], acc, set) = (acc, set)
309              | convert(EnableBroadcastInterrupt true :: r, acc, set) =
310                    checkRepeat(r, Word.orb(acc, 0w1), set, 0w1)
311              | convert(EnableBroadcastInterrupt false :: r, acc, set) =
312                    checkRepeat(r, acc (* No bit *), set, 0w1)
313              | convert(InterruptState s :: r, acc, set) =
314                    checkRepeat(r, Word.orb(setIstateBits s, acc), set, 0w6)
315              | convert(MaximumMLStack _ :: r, acc, set) =
316                    convert(r, acc, set)
317        in
318            convert(at, 0w0, 0w0)
319        end
320        
321        and setIstateBits InterruptDefer = 0w0
322          | setIstateBits InterruptSynch = 0w2
323          | setIstateBits InterruptAsynch = 0w4
324          | setIstateBits InterruptAsynchOnce = 0w6
325
326        fun getIstateBits(w: Word.word): interruptState =
327        let
328            val ibits = Word.andb(w, 0w6)
329        in
330            if ibits = 0w0
331            then InterruptDefer
332            else if ibits = 0w2
333            then InterruptSynch
334            else if ibits = 0w4
335            then InterruptAsynch
336            else InterruptAsynchOnce
337        end
338
339        fun wordToAttrs w =
340        let
341            (* Enable broadcast - true if bottom bit is set. *)
342            val bcast = EnableBroadcastInterrupt(Word.andb(w, 0w1) = 0w1)
343        in
344            [bcast, InterruptState(getIstateBits w)]
345        end
346        
347        exception Interrupt = RunCall.Interrupt
348
349        (* The thread id is opaque outside this structure but is actually a six
350           word mutable object.
351           Word 0: Index into thread table (used inside the RTS only)
352           Word 1: Flags: initialised by the RTS and set by this code
353           Word 2: Thread local store: read and set by this code.
354           Word 3: IntRequest: Set by the RTS if there is an interrupt pending
355           Word 4: Maximum ML stack size.  Unlimited is stored here as zero
356           *)
357        val threadIdFlags       = 0w1
358        and threadIdThreadLocal = 0w2
359        and threadIdIntRequest  = 0w3
360        and threadIdStackSize   = 0w4
361
362        fun getLocal (t: 'a Universal.tag) : 'a option =
363        let
364            val root: Universal.universal ref list =
365                RunCall.loadWord(self(), threadIdThreadLocal)
366
367            fun doFind [] = NONE
368              | doFind ((ref v)::r) =
369                    if Universal.tagIs t v
370                    then SOME(Universal.tagProject t v)
371                    else doFind r
372        in
373            doFind root
374        end
375        
376        fun setLocal (t: 'a Universal.tag, newVal: 'a) : unit =
377        let
378            (* See if we already have this in the list. *)
379            val root: Universal.universal ref list =
380                RunCall.loadWord(self(), threadIdThreadLocal)
381
382            fun doFind [] =
383                    (* Not in the list - Add it. *)
384                    RunCall.storeWord
385                        (self(), threadIdThreadLocal,
386                         ref (Universal.tagInject t newVal) :: root)
387              | doFind (v::r) =
388                    if Universal.tagIs t (!v)
389                        (* If it's in the list update it. *)
390                    then v := Universal.tagInject t newVal
391                    else doFind r
392
393        in
394            doFind root
395        end
396        
397        local
398            val threadTestInterrupt: unit -> unit = RunCall.rtsCallFull0 "PolyThreadTestInterrupt"
399        in
400            fun testInterrupt() =
401                (* If there is a pending request the word in the thread object
402                   will be non-zero. *)
403                if RunCall.loadWord(self(), threadIdIntRequest) <> 0
404                then threadTestInterrupt()
405                else ()
406        end
407
408        val exit: unit -> unit = RunCall.rtsCallFull0 "PolyThreadKillSelf"
409        and isActive: thread -> bool = RunCall.rtsCallFast1 "PolyThreadIsActive"
410        and broadcastInterrupt: unit -> unit = RunCall.rtsCallFull0 "PolyThreadBroadcastInterrupt"
411
412        local
413            fun getAttrWord (me: thread) : Word.word =
414                RunCall.loadWord(me, threadIdFlags)
415
416            fun getStackSizeAsInt (me: thread) : int =
417                RunCall.loadWord(me, threadIdStackSize)
418
419            and getStackSize me : int option =
420                case getStackSizeAsInt me of
421                    0 => NONE
422                |   s => SOME s
423
424            fun newStackSize ([], default) = default
425            |   newStackSize (MaximumMLStack NONE :: _, _) = 0
426            |   newStackSize (MaximumMLStack (SOME n) :: _, _) =
427                    if n <= 0 then raise Thread "The stack size must be greater than zero" else n
428            |   newStackSize (_ :: l, default) = newStackSize (l, default)
429            
430            val threadMaxStackSize: int -> unit = RunCall.rtsCallFull1 "PolyThreadMaxStackSize"
431        in
432            (* Set attributes.  Only changes the values that are specified.  The
433               others remain the same. *)
434            fun setAttributes (attrs: threadAttribute list) : unit =
435            let
436                val me = self()
437                val oldValues: Word.word = getAttrWord me
438                val (newValue, mask) = attrsToWord attrs
439                val stack = newStackSize(attrs, getStackSizeAsInt me)
440            in
441                RunCall.storeWord (self(), threadIdFlags,
442                    Word.orb(newValue, Word.andb(Word.notb mask, oldValues)));
443                if stack = getStackSizeAsInt me
444                then () else threadMaxStackSize stack;
445                (* If we are now handling interrupts asynchronously check whether
446                   we have a pending interrupt now.  This will only be effective
447                   if we were previously handling them synchronously or blocking
448                   them. *)
449                if Word.andb(newValue, 0w4) = 0w4
450                then testInterrupt()
451                else ()
452            end
453                
454            fun getAttributes() : threadAttribute list =
455            let
456                val me = self()
457            in
458                MaximumMLStack (getStackSize me) :: wordToAttrs(getAttrWord me)
459            end
460
461            (* These are used in the ConditionVar structure.  They affect only the
462               interrupt handling bits. *)
463            fun getInterruptState(): interruptState = getIstateBits(getAttrWord(self()))
464            and setInterruptState(s: interruptState): unit =
465                RunCall.storeWord (self(), threadIdFlags,
466                    Word.orb(setIstateBits s, Word.andb(Word.notb 0w6, getAttrWord(self()))))
467
468            local
469                (* The default for a new thread is to ignore broadcasts and handle explicit
470                   interrupts synchronously. *)
471                val (defaultAttrs, _) =
472                    attrsToWord[EnableBroadcastInterrupt false, InterruptState InterruptSynch]
473                val threadForkFunction:
474                    (unit->unit) * word * int -> thread = RunCall.rtsCallFull3 "PolyThreadForkThread"
475            in
476                fun fork(f:unit->unit, attrs: threadAttribute list): thread =
477                let
478                    (* Any attributes specified explicitly override the defaults. *)
479                    val (attrWord, mask) = attrsToWord attrs
480                    val attrValue = Word.orb(attrWord, Word.andb(Word.notb mask, defaultAttrs))
481                    val stack = newStackSize(attrs, 0 (* Default is unlimited *))
482                    (* Run the function and exit whether it returns normally or raises an exception. *)
483                    fun threadFunction () = (f() handle _ => ()) before exit()
484                in
485                    threadForkFunction(threadFunction, attrValue, stack)
486                end
487            end
488        end
489
490        local
491            (* Send an interrupt to a thread.  If it returns false
492               the thread did not exist and this should raise an exception. *)
493            val threadSendInterrupt: thread -> bool = RunCall.rtsCallFast1 "PolyThreadInterruptThread"
494        in
495            fun interrupt(t: thread) =
496                if threadSendInterrupt t
497                then ()
498                else raise Thread "Thread does not exist"
499        end
500
501        local
502            val threadKillThread: thread -> bool = RunCall.rtsCallFast1 "PolyThreadKillThread"
503        in
504            fun kill(t: thread) =
505                if threadKillThread t
506                then ()
507                else raise Thread "Thread does not exist"
508        end
509
510        val numProcessors: unit -> int = RunCall.rtsCallFast0 "PolyThreadNumProcessors"
511
512        local
513            val numberOfPhysical: unit -> int =
514                RunCall.rtsCallFast0 "PolyThreadNumPhysicalProcessors"
515        in
516            fun numPhysicalProcessors(): int option =
517                (* It is not always possible to get this information *)
518                case numberOfPhysical() of 0 => NONE | n => SOME n
519        end
520    end
521    
522    structure Mutex =
523    struct
524        type mutex = Word.word ref
525        val mutex = LibrarySupport.volatileWordRef (* Initially 0=unlocked. *)
526        open Thread  (* atomicIncr, atomicDecr and atomicReset are set up by Initialise. *)
527        
528        val threadMutexBlock: mutex -> unit = RunCall.rtsCallFull1 "PolyThreadMutexBlock"
529        val threadMutexUnlock: mutex -> unit = RunCall.rtsCallFull1 "PolyThreadMutexUnlock"
530
531        (* A mutex is implemented as a Word.word ref.  It is initially set to 0 and locked
532           by atomically incrementing it.  If it was previously unlocked the result will
533           by one but if it was already locked it will be some positive value.  When it
534           is unlocked it is atomically decremented.  If there was no contention the result
535           will again be 0 but if some other thread tried to lock it the result will be
536           one or positive.  In that case the unlocking thread needs to call in to the
537           RTS to wake up the blocked thread.
538
539           The cost of contention on the lock is very high.  To try to avoid this we
540           first loop (spin) to see if we can get the lock without contention.  *)
541
542        val spin_cycle = 20000
543        fun spin (m: mutex, c: int) =
544           if ! m = 0w0 then ()
545           else if c = spin_cycle then ()
546           else spin(m, c+1);
547
548        fun lock (m: mutex): unit =
549        let
550            val () = spin(m, 0)
551            val newValue = atomicIncr m
552        in
553            if newValue = 0w1
554            then () (* We've acquired the lock. *)
555            else (* It's locked.  We return when we have the lock. *)
556            (
557                threadMutexBlock m;
558                lock m (* Try again. *)
559            )
560        end
561
562        fun unlock (m: mutex): unit =
563        let
564            val newValue = atomicDecr m
565        in
566            if newValue = 0w0
567            then () (* No contention. *)
568            else
569                (* Another thread has blocked and we have to release it.  We can safely
570                   set the value to 0 here to release the lock.  If another thread
571                   acquires it before we have woken up the other threads that's fine.
572                   Equally, if another thread incremented the count and saw it was
573                   still locked it will enter the RTS and try to acquire the lock
574                   there.
575                   It's probably better to reset it here rather than within the RTS
576                   since it allows another thread to acquire the lock immediately
577                   rather than after the rather long process of entering the RTS.
578                   Resetting this needs to be atomic with respect to atomic increment
579                   and decrement.  That's not a problem on X86 so a simple assignment
580                   is sufficient but in the interpreter at least it's necessary to
581                   acquire a lock. *)
582            (
583                atomicReset m;
584                threadMutexUnlock m
585            )
586        end
587
588        (* Try to lock the mutex.  If it was previously unlocked then lock it and
589           return true otherwise return false.  Because we don't block here there is
590           the possibility that the thread that has locked it could release the lock
591           shortly afterwards.  The check for !m = 0w0 is an optimisation and nearly
592           all the time it avoids the call to atomicIncr setting m to a value > 1.
593           There is a small chance that another thread could lock the mutex between the
594           test for !m = 0w0 and the atomicIncr.  In that case the atomicIncr would
595           return a value > 1 and the function that locked the mutex will have to
596           call into the RTS to reset it when it is unlocked.  *)
597        fun trylock (m: mutex): bool =
598            if !m = 0w0 andalso atomicIncr m = 0w1
599            then true (* We've acquired the lock. *)
600            else false (* The lock was taken. *)
601    end
602
603    structure ConditionVar =
604    struct
605        open Thread
606
607        (* A condition variable contains a lock and a list of suspended threads. *)
608        type conditionVar = { lock: Mutex.mutex, threads: thread list ref }
609        fun conditionVar(): conditionVar =
610            { lock = Mutex.mutex(), threads = LibrarySupport.volatileListRef() }
611
612        local
613            val threadCondVarWait: Mutex.mutex -> unit = RunCall.rtsCallFull1 "PolyThreadCondVarWait"
614            and threadCondVarWaitUntil: Mutex.mutex * Time.time -> unit = RunCall.rtsCallFull2 "PolyThreadCondVarWaitUntil"
615        in
616            fun innerWait({lock, threads}: conditionVar, m: Mutex.mutex, t: Time.time option) : bool =
617            let
618                val me = self() (* My thread id. *)
619                
620                fun waitAgain() =
621                let
622                    fun doFind [] = false | doFind(h::t) = equal(h, me) orelse doFind t
623                    
624                    fun removeThis [] = raise Fail "Thread missing in list"
625                     |  removeThis (h::t) = if equal(h, me) then t else h :: removeThis t
626                     
627                    val () =
628                        case t of
629                            SOME time => threadCondVarWaitUntil(lock, time)
630                        |   NONE => threadCondVarWait lock
631
632                    val () = Mutex.lock lock (* Get the lock again.  *)
633                    
634                    (* Are we still on the list?  If so we haven't been explicitly woken
635                       up.  We've either timed out, been interrupted or simply returned
636                       because the RTS needed to process some asynchronous results.  *)
637                    val stillThere = doFind(!threads)
638                    open Time (* For >= *)
639                in
640                    if not stillThere
641                    then (* We're done. *)
642                    (
643                        Mutex.unlock lock;
644                        true
645                    )
646                    else if (case t of NONE => false | SOME t => Time.now() >= t)
647                    then (* We've timed out. *)
648                    (
649                        threads := removeThis(! threads);
650                        Mutex.unlock lock;
651                        false
652                    )
653                    else
654                    (
655                        (* See if we've been interrupted.  If so remove ourselves
656                           and exit. *)
657                        testInterrupt()
658                            handle exn => (threads := removeThis(! threads); Mutex.unlock lock; raise exn);
659                        (* Otherwise just keep waiting. *)
660                        waitAgain()
661                    )
662                end  
663            in
664                Mutex.lock lock; (* Lock the internal mutex. *)
665                Mutex.unlock m; (* Unlock the external mutex *)
666                threads := me :: !threads; (* Add ourselves to the list. *)
667                waitAgain() (* Wait and return the result when we're done. *)
668            end
669
670            fun doWait(c: conditionVar, m: Mutex.mutex, t: Time.time option) : bool =
671            let
672                val originalIntstate = getInterruptState()
673                (* Set this to handle interrupts synchronously unless we're already
674                   ignoring them. *)
675                val () =
676                    if originalIntstate = InterruptDefer
677                    then ()
678                    else setInterruptState InterruptSynch;
679                    
680                (* Wait for the condition.  If it raises an exception we still
681                   need to reacquire the lock unless we were handling interrupts
682                   asynchronously. *)
683                val result =
684                    innerWait(c, m, t) handle exn =>
685                        (
686                            (* We had an exception.  If we were handling exceptions synchronously
687                               we reacquire the lock.  If it was set to InterruptAsynchOnce this
688                               counts as a single asynchronous exception and we restore the
689                               state as InterruptSynch. *)
690                            case originalIntstate of
691                                InterruptDefer => (* Shouldn't happen?  *) Mutex.lock m
692                            |   InterruptSynch => Mutex.lock m
693                            |   InterruptAsynch => setInterruptState InterruptAsynch
694                            |   InterruptAsynchOnce => setInterruptState InterruptSynch;
695
696                            raise exn (* Reraise the exception*)
697                        )
698            in
699                (* Restore the original interrupt state first. *)
700                setInterruptState originalIntstate;
701                (* Normal return.  Reacquire the lock before returning. *)
702                Mutex.lock m;
703                result
704            end
705
706            fun wait(c: conditionVar, m: Mutex.mutex) : unit =
707                (doWait(c, m, NONE); ())
708            and waitUntil(c: conditionVar, m: Mutex.mutex, t: Time.time) : bool =
709                doWait(c, m, SOME t)
710        end
711        
712        local
713            (* This call wakes up the specified thread.  If the thread has already been
714               interrupted and is not ignoring interrupts it returns false.  Otherwise
715               it wakes up the thread and returns true.  We have to use this because
716               we define that if a thread is interrupted before it is signalled then
717               it raises Interrupt. *)
718            val threadCondVarWake: thread -> bool = RunCall.rtsCallFast1 "PolyThreadCondVarWake"
719            
720            (* Wake a single thread if we can (signal). *)
721            fun wakeOne [] = []
722            |   wakeOne (thread::rest) =
723                    if threadCondVarWake thread
724                    then rest
725                    else thread :: wakeOne rest
726            (* Wake all threads (broadcast). *)
727            fun wakeAll [] = [] (* Always returns the empty list. *)
728            |   wakeAll (thread::rest) = (threadCondVarWake thread; wakeAll rest)
729            
730            fun signalOrBroadcast({lock, threads}: conditionVar, wakeThreads) : unit =
731            let
732                val originalState = getInterruptState()
733            in
734                (* Set this to handle interrupts synchronously unless we're already
735                   ignoring them.  We need to do this to avoid an asynchronous
736                   interrupt which could leave the internal lock in an inconsistent state. *)
737                if originalState = InterruptDefer
738                then ()
739                else setInterruptState InterruptSynch;
740                (* Get the condition var lock. *)
741                Mutex.lock lock;
742                threads := wakeThreads(! threads);
743                Mutex.unlock lock;
744                setInterruptState originalState; (* Restore original state. *)
745                (* Test if we were interrupted while we were handling
746                   interrupts synchronously. *)
747                if originalState = InterruptAsynch orelse originalState = InterruptAsynchOnce
748                then testInterrupt()
749                else ()
750            end
751        in
752            fun signal cv = signalOrBroadcast(cv, wakeOne)
753            and broadcast cv = signalOrBroadcast(cv, wakeAll)
754        end
755    end
756end;
757
758local
759    fun prettyMutex _ _ (_: Thread.Mutex.mutex) = PolyML.PrettyString "?"
760    and prettyThread _ _ (_: Thread.Thread.thread) = PolyML.PrettyString "?"
761    and prettyCondVar _ _ (_: Thread.ConditionVar.conditionVar) = PolyML.PrettyString "?"
762in
763    val () = PolyML.addPrettyPrinter prettyMutex
764    and () = PolyML.addPrettyPrinter prettyThread
765    and () = PolyML.addPrettyPrinter prettyCondVar
766end;
767
768
769