1(*
2    Title:      Thread package for ML.
3    Author:     David C. J. Matthews
4    Copyright (c) 2007-2014, 2018
5
6    This library is free software; you can redistribute it and/or
7    modify it under the terms of the GNU Lesser General Public
8    License version 2.1 as published by the Free Software Foundation.
9    
10    This library is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    Lesser General Public License for more details.
14    
15    You should have received a copy of the GNU Lesser General Public
16    License along with this library; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18*)
19
20(* This signature and structure are not part of the standard basis library
21   but are included here because they depend on the Time structure and are
22   in turn dependencies of the BasicIO structure. *)
23
24signature THREAD =
25sig
26    exception Thread of string (* Raised if an operation fails. *)
27    
28    structure Thread:
29    sig
30        eqtype thread
31        
32        (* Thread attributes - This may be extended. *)
33        datatype threadAttribute =
34            (* Does this thread accept a broadcast interrupt?  The default is not to
35               accept broadcast interrupts. *)
36            EnableBroadcastInterrupt of bool
37            (* How to handle interrupts.  The default is to handle interrupts synchronously.  *)
38        |   InterruptState of interruptState
39            (* Maximum size of the ML stack in words. NONE means unlimited *)
40        |   MaximumMLStack of int option
41        
42        and interruptState =
43            InterruptDefer (* Defer any interrupts. *)
44        |   InterruptSynch  (* Interrupts are delivered synchronously. An interrupt
45               will be delayed until an interruption point.  An interruption point
46               is one of: testInterrupt, ConditionVar.wait, ConditionVar.waitUntil
47               and various library calls that may block, such as IO calls, pause etc.
48               N.B.  Mutex.lock is not an interruption point even though it can result
49               in a thread blocking for an indefinite period.  *)
50        |   InterruptAsynch (* Interrupts are delivered asynchronously i.e. at a suitable
51               point soon after they are triggered. *)
52        |   InterruptAsynchOnce (* As InterruptAsynch except that only a single interrupt
53               is delivered asynchronously after which the interrupt state is changed to
54               InterruptSynch.  It allows a thread to tidy up and if necessary indicate
55               that it has been interrupted without the risk of a second asynchronous
56               interrupt occurring in the handler for the first interrupt. *)
57        
58        (* fork: Fork a thread.  Starts a new thread running the function argument.  The
59           attribute list gives initial values for thread attributes which can be
60           modified by the thread itself.  Any unspecified attributes take default values.
61           The thread is terminated when the thread function returns, if it
62           raises an uncaught exception or if it calls "exit". *)
63        val fork: (unit->unit) * threadAttribute list -> thread
64        (* exit: Terminate this thread. *)
65        val exit: unit -> unit
66        (* isActive: Test if a thread is still running or has terminated. *)
67        val isActive: thread -> bool
68        
69        (* Test whether thread ids are the same.  No longer needed if this is an eqtype. *)
70        val equal: thread * thread -> bool
71        (* Get my own ID. *)
72        val self: unit -> thread
73        
74        exception Interrupt (* = SML90.Interrupt *)
75        (* Send an Interrupt exception to a specific thread.  When and indeed whether
76           the exception is actually delivered will depend on the interrupt state
77           of the target thread.  Raises Thread if the thread is no longer running,
78           so an exception handler should be used unless the thread is known to
79           be blocked. *)
80        val interrupt: thread -> unit
81        (* Send an interrupt exception to every thread which is set to accept it. *)
82        val broadcastInterrupt: unit -> unit
83        (* If this thread is handling interrupts synchronously, test to see 
84           if it has been interrupted.  If so it raises the Interrupt
85           exception. *)
86        val testInterrupt: unit -> unit
87        (* Terminate a thread. This should be used as a last resort.  Normally
88           a thread should be allowed to clean up and terminate by using the
89           interrupt call.  Raises Thread if the thread is no longer running,
90           so an exception handler should be used unless the thread is known to
91           be blocked. *)
92        val kill: thread -> unit
93        
94        (* Get and set thread-local store for the calling thread. The store is a
95           tagged associative memory which is initially empty for a new thread.
96           A thread can call setLocal to add or replace items in its store and
97           call getLocal to return values if they exist.  The Universal structure
98           contains functions to make new tags as well as injection, projection and
99           test functions. *)
100        val getLocal: 'a Universal.tag -> 'a option
101        val setLocal: 'a Universal.tag * 'a -> unit
102        
103        (* Change the specified attribute(s) for the calling thread.  Unspecified
104           attributes remain unchanged. *)
105        val setAttributes: threadAttribute list -> unit
106        (* Get the values of attributes. *)
107        val getAttributes: unit -> threadAttribute list
108
109        (* Return the number of processors that will be used to run threads. *)
110        val numProcessors: unit -> int
111        (* and the number of physical processors if that is available. *)
112        and numPhysicalProcessors: unit -> int option
113    end
114        
115    structure Mutex:
116    sig
117        (* Mutexes.  A mutex provides simple mutual exclusion.  A thread can lock
118           a mutex and until it unlocks it no other thread will be able to lock it.
119           Locking and unlocking are intended to be fast in the situation when
120           there is no other process attempting to lock the mutex.  *)
121        type mutex
122        (* mutex: Make a new mutex *)
123        val mutex: unit -> mutex
124        (* lock:  Lock a mutex.  If the mutex is currently locked the thread is
125           blocked until it is unlocked.  If a thread tries to lock a mutex that
126           it has previously locked the thread will deadlock.
127           N.B.  "lock" is not an interruption point (a point where synchronous
128           interrupts are delivered) even though a thread can be blocked indefinitely. *)
129        val lock: mutex -> unit
130        (* unlock:  Unlock a mutex and allow any waiting threads to run.  The behaviour
131           if the mutex was not previously locked by the calling thread is undefined.  *)
132        val unlock: mutex -> unit
133        (* trylock: Attempt to lock the mutex.  Returns true if the mutex was not
134           previously locked and has now been locked by the calling thread.  Returns
135           false if the mutex was previously locked, including by the calling thread. *)
136        val trylock: mutex -> bool
137        
138        (* These functions may not work correctly if an asynchronous interrupt
139           is delivered during the calls.  A thread should use synchronous interrupt
140           when using these calls. *)
141    end
142    
143    structure ConditionVar:
144    sig
145        (* Condition variables.  Condition variables are used to provide communication
146           between threads.  A condition variable is used in conjunction with a mutex
147           and usually a reference to establish and test changes in state.  The normal
148           use is for one thread to lock a mutex, test the reference and then wait on
149           the condition variable, releasing the lock on the mutex while it does so.
150           Another thread may then lock the mutex, update the reference, unlock the
151           mutex, and signal the condition variable.  This wakes up the first thread
152           and reacquires the lock allowing the thread to test the updated reference
153           with the lock held.
154           More complex communication mechanisms, such as blocking channels, can
155           be written in terms of condition variables. *)
156        type conditionVar
157        (* conditionVar: Make a new condition variable. *)
158        val conditionVar: unit -> conditionVar
159        (* wait: Release the mutex and block until the condition variable is
160           signalled.  When wait returns the mutex has been re-acquired.
161           If thread is handling interrupts synchronously a call to "wait" may cause
162           an Interrupt exception to be delivered.
163           (The implementation must ensure that if an Interrupt is delivered as well
164           as signal waking up a single thread that the interrupted thread does not
165           consume the "signal".)
166           The mutex is (re)acquired before Interrupt is delivered.  *)
167        val wait: conditionVar * Mutex.mutex -> unit
168        (* waitUntil: As wait except that it blocks until either the condition
169           variable is signalled or the time (absolute) is reached.  Either way
170           the mutex is reacquired so there may be a further delay if it is held
171           by another thread.  *)
172        val waitUntil: conditionVar * Mutex.mutex * Time.time -> bool
173        (* signal: Wake up one thread if any are waiting on the condition variable. *)
174        val signal: conditionVar -> unit
175        (* broadcast: Wake up all threads waiting on the condition variable. *)
176        val broadcast: conditionVar -> unit
177    end
178
179end;
180
181structure Thread :> THREAD =
182struct
183    exception Thread = RunCall.Thread
184    
185    (* Create non-overwritable mutables for mutexes and condition variables.
186       A non-overwritable mutable in the executable or a saved state is not
187       overwritten when a saved state further down the hierarchy is loaded. *)
188    val nvref = LibrarySupport.noOverwriteRef
189    
190    structure Thread =
191    struct
192        open Thread (* Created in INITIALISE with thread type and self function. *)
193
194        (* Equality is pointer equality. *)
195        val equal : thread*thread->bool = op =
196
197        datatype threadAttribute =
198            EnableBroadcastInterrupt of bool
199        |   InterruptState of interruptState
200        |   MaximumMLStack of int option
201        
202        and interruptState =
203            InterruptDefer
204        |   InterruptSynch
205        |   InterruptAsynch
206        |   InterruptAsynchOnce 
207
208        (* Convert attributes to bits and a mask. *)
209        fun attrsToWord (at: threadAttribute list): Word.word * Word.word =
210        let
211            (* Check that a particular attribute appears only once.
212               As well as accumulating the actual bits in the result we
213               also accumulate the mask of bits.  If any of these
214               reappear we raise an exception. *)
215            fun checkRepeat(r, acc, set, mask) =
216                if Word.andb(set, mask) <> 0w0
217                then raise Thread "The same attribute appears more than once in the list"
218                else convert(r, acc,  Word.orb(set, mask))
219
220            and convert([], acc, set) = (acc, set)
221              | convert(EnableBroadcastInterrupt true :: r, acc, set) =
222                    checkRepeat(r, Word.orb(acc, 0w1), set, 0w1)
223              | convert(EnableBroadcastInterrupt false :: r, acc, set) =
224                    checkRepeat(r, acc (* No bit *), set, 0w1)
225              | convert(InterruptState s :: r, acc, set) =
226                    checkRepeat(r, Word.orb(setIstateBits s, acc), set, 0w6)
227              | convert(MaximumMLStack _ :: r, acc, set) =
228                    convert(r, acc, set)
229        in
230            convert(at, 0w0, 0w0)
231        end
232        
233        and setIstateBits InterruptDefer = 0w0
234          | setIstateBits InterruptSynch = 0w2
235          | setIstateBits InterruptAsynch = 0w4
236          | setIstateBits InterruptAsynchOnce = 0w6
237
238        fun getIstateBits(w: Word.word): interruptState =
239        let
240            val ibits = Word.andb(w, 0w6)
241        in
242            if ibits = 0w0
243            then InterruptDefer
244            else if ibits = 0w2
245            then InterruptSynch
246            else if ibits = 0w4
247            then InterruptAsynch
248            else InterruptAsynchOnce
249        end
250
251        fun wordToAttrs w =
252        let
253            (* Enable broadcast - true if bottom bit is set. *)
254            val bcast = EnableBroadcastInterrupt(Word.andb(w, 0w1) = 0w1)
255        in
256            [bcast, InterruptState(getIstateBits w)]
257        end
258        
259        exception Interrupt = RunCall.Interrupt
260
261        (* The thread id is opaque outside this structure but is actually a six
262           word mutable object.
263           Word 0: Index into thread table (used inside the RTS only)
264           Word 1: Flags: initialised by the RTS and set by this code
265           Word 2: Thread local store: read and set by this code.
266           Word 3: IntRequest: Set by the RTS if there is an interrupt pending
267           Word 4: Maximum ML stack size.  Unlimited is stored here as zero
268           *)
269        val threadIdFlags       = 0w1
270        and threadIdThreadLocal = 0w2
271        and threadIdIntRequest  = 0w3
272        and threadIdStackSize   = 0w4
273
274        fun getLocal (t: 'a Universal.tag) : 'a option =
275        let
276            val root: Universal.universal ref list =
277                RunCall.loadWord(self(), threadIdThreadLocal)
278
279            fun doFind [] = NONE
280              | doFind ((ref v)::r) =
281                    if Universal.tagIs t v
282                    then SOME(Universal.tagProject t v)
283                    else doFind r
284        in
285            doFind root
286        end
287        
288        fun setLocal (t: 'a Universal.tag, newVal: 'a) : unit =
289        let
290            (* See if we already have this in the list. *)
291            val root: Universal.universal ref list =
292                RunCall.loadWord(self(), threadIdThreadLocal)
293
294            fun doFind [] =
295                    (* Not in the list - Add it. *)
296                    RunCall.storeWord
297                        (self(), threadIdThreadLocal,
298                         ref (Universal.tagInject t newVal) :: root)
299              | doFind (v::r) =
300                    if Universal.tagIs t (!v)
301                        (* If it's in the list update it. *)
302                    then v := Universal.tagInject t newVal
303                    else doFind r
304
305        in
306            doFind root
307        end
308        
309        local
310            val threadTestInterrupt: unit -> unit = RunCall.rtsCallFull0 "PolyThreadTestInterrupt"
311        in
312            fun testInterrupt() =
313                (* If there is a pending request the word in the thread object
314                   will be non-zero. *)
315                if RunCall.loadWord(self(), threadIdIntRequest) <> 0
316                then threadTestInterrupt()
317                else ()
318        end
319
320        local
321            fun getAttrWord (me: thread) : Word.word =
322                RunCall.loadWord(me, threadIdFlags)
323
324            fun getStackSizeAsInt (me: thread) : int =
325                RunCall.loadWord(me, threadIdStackSize)
326
327            and getStackSize me : int option =
328                case getStackSizeAsInt me of
329                    0 => NONE
330                |   s => SOME s
331
332            fun newStackSize ([], default) = default
333            |   newStackSize (MaximumMLStack NONE :: _, _) = 0
334            |   newStackSize (MaximumMLStack (SOME n) :: _, _) =
335                    if n <= 0 then raise Thread "The stack size must be greater than zero" else n
336            |   newStackSize (_ :: l, default) = newStackSize (l, default)
337            
338            val threadMaxStackSize: int -> unit = RunCall.rtsCallFull1 "PolyThreadMaxStackSize"
339        in
340            (* Set attributes.  Only changes the values that are specified.  The
341               others remain the same. *)
342            fun setAttributes (attrs: threadAttribute list) : unit =
343            let
344                val me = self()
345                val oldValues: Word.word = getAttrWord me
346                val (newValue, mask) = attrsToWord attrs
347                val stack = newStackSize(attrs, getStackSizeAsInt me)
348            in
349                RunCall.storeWord (self(), threadIdFlags,
350                    Word.orb(newValue, Word.andb(Word.notb mask, oldValues)));
351                if stack = getStackSizeAsInt me
352                then () else threadMaxStackSize stack;
353                (* If we are now handling interrupts asynchronously check whether
354                   we have a pending interrupt now.  This will only be effective
355                   if we were previously handling them synchronously or blocking
356                   them. *)
357                if Word.andb(newValue, 0w4) = 0w4
358                then testInterrupt()
359                else ()
360            end
361                
362            fun getAttributes() : threadAttribute list =
363            let
364                val me = self()
365            in
366                MaximumMLStack (getStackSize me) :: wordToAttrs(getAttrWord me)
367            end
368
369            (* These are used in the ConditionVar structure.  They affect only the
370               interrupt handling bits. *)
371            fun getInterruptState(): interruptState = getIstateBits(getAttrWord(self()))
372            and setInterruptState(s: interruptState): unit =
373                RunCall.storeWord (self(), threadIdFlags,
374                    Word.orb(setIstateBits s, Word.andb(Word.notb 0w6, getAttrWord(self()))))
375
376            local
377                (* The default for a new thread is to ignore broadcasts and handle explicit
378                   interrupts synchronously. *)
379                val (defaultAttrs, _) =
380                    attrsToWord[EnableBroadcastInterrupt false, InterruptState InterruptSynch]
381                val threadForkFunction:
382                    (unit->unit) * word * int -> thread = RunCall.rtsCallFull3 "PolyThreadForkThread"
383            in
384                fun fork(f:unit->unit, attrs: threadAttribute list): thread =
385                let
386                    (* Any attributes specified explicitly override the defaults. *)
387                    val (attrWord, mask) = attrsToWord attrs
388                    val attrValue = Word.orb(attrWord, Word.andb(Word.notb mask, defaultAttrs))
389                    val stack = newStackSize(attrs, 0 (* Default is unlimited *))
390                in
391                    threadForkFunction(f, attrValue, stack)
392                end
393            end
394        end
395
396        val exit: unit -> unit = RunCall.rtsCallFull0 "PolyThreadKillSelf"
397        and isActive: thread -> bool = RunCall.rtsCallFast1 "PolyThreadIsActive"
398        and broadcastInterrupt: unit -> unit = RunCall.rtsCallFull0 "PolyThreadBroadcastInterrupt"
399
400        local
401            (* Send an interrupt to a thread.  If it returns false
402               the thread did not exist and this should raise an exception. *)
403            val threadSendInterrupt: thread -> bool = RunCall.rtsCallFast1 "PolyThreadInterruptThread"
404        in
405            fun interrupt(t: thread) =
406                if threadSendInterrupt t
407                then ()
408                else raise Thread "Thread does not exist"
409        end
410
411        local
412            val threadKillThread: thread -> bool = RunCall.rtsCallFast1 "PolyThreadKillThread"
413        in
414            fun kill(t: thread) =
415                if threadKillThread t
416                then ()
417                else raise Thread "Thread does not exist"
418        end
419
420        val numProcessors: unit -> int = RunCall.rtsCallFast0 "PolyThreadNumProcessors"
421
422        local
423            val numberOfPhysical: unit -> int =
424                RunCall.rtsCallFast0 "PolyThreadNumPhysicalProcessors"
425        in
426            fun numPhysicalProcessors(): int option =
427                (* It is not always possible to get this information *)
428                case numberOfPhysical() of 0 => NONE | n => SOME n
429        end
430    end
431    
432    structure Mutex =
433    struct
434        type mutex = Word.word ref
435        fun mutex() = nvref 0w1; (* Initially unlocked. *)
436        open Thread  (* atomicIncr, atomicDecr and atomicReset are set up by Initialise. *)
437        
438        val threadMutexBlock: mutex -> unit = RunCall.rtsCallFull1 "PolyThreadMutexBlock"
439        val threadMutexUnlock: mutex -> unit = RunCall.rtsCallFull1 "PolyThreadMutexUnlock"
440
441        (* A mutex is implemented as a Word.word ref.  It is initially set to 1 and locked
442           by atomically decrementing it.  If it was previously unlocked the result will
443           by zero but if it was already locked it will be some negative value.  When it
444           is unlocked it is atomically incremented.  If there was no contention the result
445           will again be 1 but if some other thread tried to lock it the result will be
446           zero or negative.  In that case the unlocking thread needs to call in to the
447           RTS to wake up the blocked thread.
448
449           The cost of contention on the lock is very high.  To try to avoid this we
450           first loop (spin) to see if we can get the lock without contention.  *)
451
452        val spin_cycle = 20000
453        fun spin (m: mutex, c: int) =
454           if ! m = 0w1 then ()
455           else if c = spin_cycle then ()
456           else spin(m, c+1);
457
458        fun lock (m: mutex): unit =
459        let
460            val () = spin(m, 0)
461            val newValue = atomicDecr m
462        in
463            if newValue = 0w0
464            then () (* We've acquired the lock. *)
465            else (* It's locked.  We return when we have the lock. *)
466            (
467                threadMutexBlock m;
468                lock m (* Try again. *)
469            )
470        end
471
472        fun unlock (m: mutex): unit =
473        let
474            val newValue = atomicIncr m
475        in
476            if newValue = 0w1
477            then () (* No contention. *)
478            else
479                (* Another thread has blocked and we have to release it.  We can safely
480                   set the value to 1 here to release the lock.  If another thread
481                   acquires it before we have woken up the other threads that's fine.
482                   Equally, if another thread decremented the count and saw it was
483                   still locked it will enter the RTS and try to acquire the lock
484                   there.
485                   It's probably better to reset it here rather than within the RTS
486                   since it allows another thread to acquire the lock immediately
487                   rather than after the rather long process of entering the RTS.
488                   Resetting this needs to be atomic with respect to atomic increment
489                   and decrement.  That's not a problem on X86 so a simple assignment
490                   is sufficient but in the interpreter at least it's necessary to
491                   acquire a lock. *)
492            (
493                atomicReset m;
494                threadMutexUnlock m
495            )
496        end
497
498        (* Try to lock the mutex.  If it was previously unlocked then lock it and
499           return true otherwise return false.  Because we don't block here there is
500           the possibility that the thread that has locked it could release the lock
501           shortly afterwards.  The check for !m = 0w1 is an optimisation and nearly
502           all the time it avoids the call to atomicDecr setting m to a negative value.
503           There is a small chance that another thread could lock the mutex between the
504           test for !m = 0w1 and the atomicDecr.  In that case the atomicDecr would
505           return a negative value and the function that locked the mutex will have to
506           call into the RTS to reset it when it is unlocked.  *)
507        fun trylock (m: mutex): bool =
508            if !m = 0w1 andalso atomicDecr m = 0w0
509            then true (* We've acquired the lock. *)
510            else false (* The lock was taken. *)
511    end
512
513    structure ConditionVar =
514    struct
515        open Thread
516
517        (* A condition variable contains a lock and a list of suspended threads. *)
518        type conditionVar = { lock: Mutex.mutex, threads: thread list ref }
519        fun conditionVar(): conditionVar =
520            { lock = Mutex.mutex(), threads = nvref nil }
521
522        local
523            val threadCondVarWait: Mutex.mutex -> unit = RunCall.rtsCallFull1 "PolyThreadCondVarWait"
524            and threadCondVarWaitUntil: Mutex.mutex * Time.time -> unit = RunCall.rtsCallFull2 "PolyThreadCondVarWaitUntil"
525        in
526            fun innerWait({lock, threads}: conditionVar, m: Mutex.mutex, t: Time.time option) : bool =
527            let
528                val me = self() (* My thread id. *)
529                
530                fun waitAgain() =
531                let
532                    fun doFind [] = false | doFind(h::t) = equal(h, me) orelse doFind t
533                    
534                    fun removeThis [] = raise Fail "Thread missing in list"
535                     |  removeThis (h::t) = if equal(h, me) then t else h :: removeThis t
536                     
537                    val () =
538                        case t of
539                            SOME time => threadCondVarWaitUntil(lock, time)
540                        |   NONE => threadCondVarWait lock
541
542                    val () = Mutex.lock lock (* Get the lock again.  *)
543                    
544                    (* Are we still on the list?  If so we haven't been explicitly woken
545                       up.  We've either timed out, been interrupted or simply returned
546                       because the RTS needed to process some asynchronous results.  *)
547                    val stillThere = doFind(!threads)
548                    open Time (* For >= *)
549                in
550                    if not stillThere
551                    then (* We're done. *)
552                    (
553                        Mutex.unlock lock;
554                        true
555                    )
556                    else if (case t of NONE => false | SOME t => Time.now() >= t)
557                    then (* We've timed out. *)
558                    (
559                        threads := removeThis(! threads);
560                        Mutex.unlock lock;
561                        false
562                    )
563                    else
564                    (
565                        (* See if we've been interrupted.  If so remove ourselves
566                           and exit. *)
567                        testInterrupt()
568                            handle exn => (threads := removeThis(! threads); Mutex.unlock lock; raise exn);
569                        (* Otherwise just keep waiting. *)
570                        waitAgain()
571                    )
572                end  
573            in
574                Mutex.lock lock; (* Lock the internal mutex. *)
575                Mutex.unlock m; (* Unlock the external mutex *)
576                threads := me :: !threads; (* Add ourselves to the list. *)
577                waitAgain() (* Wait and return the result when we're done. *)
578            end
579
580            fun doWait(c: conditionVar, m: Mutex.mutex, t: Time.time option) : bool =
581            let
582                val originalIntstate = getInterruptState()
583                (* Set this to handle interrupts synchronously unless we're already
584                   ignoring them. *)
585                val () =
586                    if originalIntstate = InterruptDefer
587                    then ()
588                    else setInterruptState InterruptSynch;
589                    
590                (* Wait for the condition.  If it raises an exception we still
591                   need to reacquire the lock unless we were handling interrupts
592                   asynchronously. *)
593                val result =
594                    innerWait(c, m, t) handle exn =>
595                        (
596                            (* We had an exception.  If we were handling exceptions synchronously
597                               we reacquire the lock.  If it was set to InterruptAsynchOnce this
598                               counts as a single asynchronous exception and we restore the
599                               state as InterruptSynch. *)
600                            case originalIntstate of
601                                InterruptDefer => (* Shouldn't happen?  *) Mutex.lock m
602                            |   InterruptSynch => Mutex.lock m
603                            |   InterruptAsynch => setInterruptState InterruptAsynch
604                            |   InterruptAsynchOnce => setInterruptState InterruptSynch;
605
606                            raise exn (* Reraise the exception*)
607                        )
608            in
609                (* Restore the original interrupt state first. *)
610                setInterruptState originalIntstate;
611                (* Normal return.  Reacquire the lock before returning. *)
612                Mutex.lock m;
613                result
614            end
615
616            fun wait(c: conditionVar, m: Mutex.mutex) : unit =
617                (doWait(c, m, NONE); ())
618            and waitUntil(c: conditionVar, m: Mutex.mutex, t: Time.time) : bool =
619                doWait(c, m, SOME t)
620        end
621        
622        local
623            (* This call wakes up the specified thread.  If the thread has already been
624               interrupted and is not ignoring interrupts it returns false.  Otherwise
625               it wakes up the thread and returns true.  We have to use this because
626               we define that if a thread is interrupted before it is signalled then
627               it raises Interrupt. *)
628            val threadCondVarWake: thread -> bool = RunCall.rtsCallFast1 "PolyThreadCondVarWake"
629            
630            (* Wake a single thread if we can (signal). *)
631            fun wakeOne [] = []
632            |   wakeOne (thread::rest) =
633                    if threadCondVarWake thread
634                    then rest
635                    else thread :: wakeOne rest
636            (* Wake all threads (broadcast). *)
637            fun wakeAll [] = [] (* Always returns the empty list. *)
638            |   wakeAll (thread::rest) = (threadCondVarWake thread; wakeAll rest)
639            
640            fun signalOrBroadcast({lock, threads}: conditionVar, wakeThreads) : unit =
641            let
642                val originalState = getInterruptState()
643            in
644                (* Set this to handle interrupts synchronously unless we're already
645                   ignoring them.  We need to do this to avoid an asynchronous
646                   interrupt which could leave the internal lock in an inconsistent state. *)
647                if originalState = InterruptDefer
648                then ()
649                else setInterruptState InterruptSynch;
650                (* Get the condition var lock. *)
651                Mutex.lock lock;
652                threads := wakeThreads(! threads);
653                Mutex.unlock lock;
654                setInterruptState originalState; (* Restore original state. *)
655                (* Test if we were interrupted while we were handling
656                   interrupts synchronously. *)
657                if originalState = InterruptAsynch orelse originalState = InterruptAsynchOnce
658                then testInterrupt()
659                else ()
660            end
661        in
662            fun signal cv = signalOrBroadcast(cv, wakeOne)
663            and broadcast cv = signalOrBroadcast(cv, wakeAll)
664        end
665    end
666end;
667
668structure ThreadLib:
669sig
670    val protect: Thread.Mutex.mutex -> ('a -> 'b) -> 'a -> 'b
671end =
672struct
673    (* This applies a function while a mutex is being held. 
674       Although this can be defined in terms of Thread.Thread.getAttributes it's
675       defined here using the underlying calls.  The original version with
676       getAttributes appeared as a major allocation hot-spot when building the
677       compiler because "protect" is called round every access to the global
678       name-space. *)
679    fun protect m f a =
680    let
681        open Thread.Thread Thread.Mutex
682        open Word
683        (* Set this to handle interrupts synchronously except if we are blocking
684           them.  We don't want to get an asynchronous interrupt while we are
685           actually locking or unlocking the mutex but if we have to block to do
686           IO then we should allow an interrupt at that point. *)
687        val oldAttrs: Word.word = RunCall.loadWord(self(), 0w1)
688        val () =
689            if andb(oldAttrs, 0w6) = 0w0 (* Already deferred? *)
690            then ()
691            else RunCall.storeWord (self(), 0w1,
692                    orb(andb(notb 0w6, oldAttrs), 0w2))
693        fun restoreAttrs() =
694        (
695            RunCall.storeWord (self(), 0w1, oldAttrs);
696            if andb(oldAttrs, 0w4) = 0w4 then testInterrupt() else ()
697        )
698        val () = lock m
699        val result = f a
700            handle exn =>
701            (
702                unlock m; restoreAttrs();
703                (* Reraise the exception preserving the location information. *)
704                PolyML.Exception.reraise exn
705            )
706    in
707        unlock m;
708        restoreAttrs();
709        result
710    end
711end;
712
713
714local
715    fun prettyMutex _ _ (_: Thread.Mutex.mutex) = PolyML.PrettyString "?"
716    and prettyThread _ _ (_: Thread.Thread.thread) = PolyML.PrettyString "?"
717    and prettyCondVar _ _ (_: Thread.ConditionVar.conditionVar) = PolyML.PrettyString "?"
718in
719    val () = PolyML.addPrettyPrinter prettyMutex
720    and () = PolyML.addPrettyPrinter prettyThread
721    and () = PolyML.addPrettyPrinter prettyCondVar
722end;
723
724
725