1(* Title: Pure/Concurrent/event_timer.ML 2 Author: Makarius 3 4Initiate event after given point in time. 5 6Note: events are run as synchronized action within a dedicated thread 7and should finish quickly without further ado. 8*) 9 10signature EVENT_TIMER = 11sig 12 eqtype request 13 val request: Time.time -> (unit -> unit) -> request 14 val cancel: request -> bool 15 val future: Time.time -> unit Future.future 16 val shutdown: unit -> unit 17end; 18 19structure Event_Timer:> EVENT_TIMER = 20struct 21 22open Portable 23(* type request *) 24 25val request_counter = Counter.make (); 26datatype request = Request of int; 27fun new_request () = Request (request_counter ()); 28 29 30(* type requests *) 31 32structure Requests = Table( 33 type key = Time.time 34 val ord = Time.compare 35 fun pp t = HOLPP.add_string (Time.toString t) 36 ); 37type requests = (request * (unit -> unit)) list Requests.table; 38 39fun add_request time entry (requests: requests) = 40 Requests.cons_list (time, entry) requests; 41 42fun del_request req (requests: requests) = 43 let 44 val old_request = 45 requests |> Requests.get_first (fn (key, entries) => 46 entries |> get_first (fn entry => if fst entry = req then SOME (key, entry) else NONE)); 47 in 48 (case old_request of 49 NONE => (false, requests) 50 | SOME old => (true, Requests.remove_list (fst_eq equal) old requests)) 51 end; 52 53fun next_request_time (requests: requests) = 54 Option.map fst (Requests.min requests); 55 56fun next_request_event t0 (requests: requests) = 57 (case Requests.min requests of 58 NONE => NONE 59 | SOME (time, entries) => 60 if t0 < time then NONE 61 else 62 let 63 val (rest, (_, event)) = front_last entries; 64 val requests' = 65 if null rest then Requests.delete time requests 66 else Requests.update (time, rest) requests; 67 in SOME (event, requests') end); 68 69 70(* global state *) 71 72datatype status = Normal | Shutdown_Req | Shutdown_Ack; 73 74datatype state = 75 State of {requests: requests, status: status, manager: Thread.thread option}; 76 77fun make_state (requests, status, manager) = 78 State {requests = requests, status = status, manager = manager}; 79 80val normal_state = make_state (Requests.empty, Normal, NONE); 81val shutdown_ack_state = make_state (Requests.empty, Shutdown_Ack, NONE); 82 83fun is_shutdown s (State {requests, status, manager}) = 84 Requests.is_empty requests andalso status = s andalso not (isSome manager); 85 86fun is_shutdown_req (State {requests, status, ...}) = 87 Requests.is_empty requests andalso status = Shutdown_Req; 88 89val state = Synchronized.var "Event_Timer.state" normal_state; 90 91 92(* manager thread *) 93 94fun manager_loop () = 95 if Synchronized.timed_access state 96 (fn State {requests, ...} => next_request_time requests) 97 (fn st as State {requests, status, manager} => 98 (case next_request_event (Time.now ()) requests of 99 SOME (event, requests') => 100 let 101 val _ = Exn.capture event (); 102 val state' = make_state (requests', status, manager); 103 in SOME (true, state') end 104 | NONE => 105 if is_shutdown_req st 106 then SOME (false, shutdown_ack_state) 107 else NONE)) <> SOME false 108 then manager_loop () else (); 109 110fun manager_check manager = 111 if isSome manager andalso Thread.isActive (valOf manager) then manager 112 else 113 SOME (Standard_Thread.fork {name = "event_timer", stack_limit = NONE, 114 interrupts = false} 115 manager_loop); 116 117fun shutdown () = 118 Thread_Attributes.uninterruptible (fn restore_attributes => fn () => 119 if Synchronized.change_result state (fn st as State {requests, manager, ...} => 120 if is_shutdown Normal st then (false, st) 121 else if is_shutdown Shutdown_Ack st orelse is_shutdown_req st then 122 raise Fail "Concurrent attempt to shutdown event timer" 123 else (true, make_state (requests, Shutdown_Req, manager_check manager))) 124 then 125 restore_attributes (fn () => 126 Synchronized.guarded_access state 127 (fn st => if is_shutdown Shutdown_Ack st then SOME ((), normal_state) else NONE)) () 128 handle exn => 129 if Exn.is_interrupt exn then 130 Synchronized.change state (fn State {requests, manager, ...} => 131 make_state (requests, Normal, manager)) 132 else () 133 else ()) (); 134 135 136(* main operations *) 137 138fun request time event = 139 Synchronized.change_result state (fn State {requests, status, manager} => 140 let 141 val req = new_request (); 142 val requests' = add_request time (req, event) requests; 143 val manager' = manager_check manager; 144 in (req, make_state (requests', status, manager')) end); 145 146fun cancel req = 147 Synchronized.change_result state (fn State {requests, status, manager} => 148 let 149 val (canceled, requests') = del_request req requests; 150 val manager' = manager_check manager; 151 in (canceled, make_state (requests', status, manager')) end); 152 153val future = Thread_Attributes.uninterruptible (fn _ => fn time => 154 let 155 val req: request Single_Assignment.var = Single_Assignment.var "request" 156 fun abort () = ignore (cancel (Single_Assignment.await req)) 157 val promise: unit Future.future = Future.promise_name "event_timer" abort 158 val _ = Single_Assignment.assign req (request time (Future.fulfill promise)) 159 in 160 promise 161 end); 162 163end; 164