1(*  Title:      Pure/Concurrent/event_timer.ML
2    Author:     Makarius
3
4Initiate event after given point in time.
5
6Note: events are run as synchronized action within a dedicated thread
7and should finish quickly without further ado.
8*)
9
10signature EVENT_TIMER =
11sig
12  eqtype request
13  val request: Time.time -> (unit -> unit) -> request
14  val cancel: request -> bool
15  val future: Time.time -> unit Future.future
16  val shutdown: unit -> unit
17end;
18
19structure Event_Timer:> EVENT_TIMER =
20struct
21
22open Portable
23(* type request *)
24
25val request_counter = Counter.make ();
26datatype request = Request of int;
27fun new_request () = Request (request_counter ());
28
29
30(* type requests *)
31
32structure Requests = Table(
33            type key = Time.time
34            val ord = Time.compare
35            fun pp t = HOLPP.add_string (Time.toString t)
36          );
37type requests = (request * (unit -> unit)) list Requests.table;
38
39fun add_request time entry (requests: requests) =
40  Requests.cons_list (time, entry) requests;
41
42fun del_request req (requests: requests) =
43  let
44    val old_request =
45      requests |> Requests.get_first (fn (key, entries) =>
46        entries |> get_first (fn entry => if fst entry = req then SOME (key, entry) else NONE));
47  in
48    (case old_request of
49      NONE => (false, requests)
50    | SOME old => (true, Requests.remove_list (fst_eq equal) old requests))
51  end;
52
53fun next_request_time (requests: requests) =
54  Option.map fst (Requests.min requests);
55
56fun next_request_event t0 (requests: requests) =
57  (case Requests.min requests of
58    NONE => NONE
59  | SOME (time, entries) =>
60      if t0 < time then NONE
61      else
62        let
63          val (rest, (_, event)) = front_last entries;
64          val requests' =
65            if null rest then Requests.delete time requests
66            else Requests.update (time, rest) requests;
67        in SOME (event, requests') end);
68
69
70(* global state *)
71
72datatype status = Normal | Shutdown_Req | Shutdown_Ack;
73
74datatype state =
75  State of {requests: requests, status: status, manager: Thread.thread option};
76
77fun make_state (requests, status, manager) =
78  State {requests = requests, status = status, manager = manager};
79
80val normal_state = make_state (Requests.empty, Normal, NONE);
81val shutdown_ack_state = make_state (Requests.empty, Shutdown_Ack, NONE);
82
83fun is_shutdown s (State {requests, status, manager}) =
84  Requests.is_empty requests andalso status = s andalso not (isSome manager);
85
86fun is_shutdown_req (State {requests, status, ...}) =
87  Requests.is_empty requests andalso status = Shutdown_Req;
88
89val state = Synchronized.var "Event_Timer.state" normal_state;
90
91
92(* manager thread *)
93
94fun manager_loop () =
95  if Synchronized.timed_access state
96    (fn State {requests, ...} => next_request_time requests)
97    (fn st as State {requests, status, manager} =>
98      (case next_request_event (Time.now ()) requests of
99        SOME (event, requests') =>
100          let
101            val _ = Exn.capture event ();
102            val state' = make_state (requests', status, manager);
103          in SOME (true, state') end
104      | NONE =>
105          if is_shutdown_req st
106          then SOME (false, shutdown_ack_state)
107          else NONE)) <> SOME false
108  then manager_loop () else ();
109
110fun manager_check manager =
111  if isSome manager andalso Thread.isActive (valOf manager) then manager
112  else
113    SOME (Standard_Thread.fork {name = "event_timer", stack_limit = NONE,
114                                interrupts = false}
115      manager_loop);
116
117fun shutdown () =
118  Thread_Attributes.uninterruptible (fn restore_attributes => fn () =>
119    if Synchronized.change_result state (fn st as State {requests, manager, ...} =>
120      if is_shutdown Normal st then (false, st)
121      else if is_shutdown Shutdown_Ack st orelse is_shutdown_req st then
122        raise Fail "Concurrent attempt to shutdown event timer"
123      else (true, make_state (requests, Shutdown_Req, manager_check manager)))
124    then
125      restore_attributes (fn () =>
126        Synchronized.guarded_access state
127          (fn st => if is_shutdown Shutdown_Ack st then SOME ((), normal_state) else NONE)) ()
128      handle exn =>
129        if Exn.is_interrupt exn then
130          Synchronized.change state (fn State {requests, manager, ...} =>
131            make_state (requests, Normal, manager))
132        else ()
133    else ()) ();
134
135
136(* main operations *)
137
138fun request time event =
139  Synchronized.change_result state (fn State {requests, status, manager} =>
140    let
141      val req = new_request ();
142      val requests' = add_request time (req, event) requests;
143      val manager' = manager_check manager;
144    in (req, make_state (requests', status, manager')) end);
145
146fun cancel req =
147  Synchronized.change_result state (fn State {requests, status, manager} =>
148    let
149      val (canceled, requests') = del_request req requests;
150      val manager' = manager_check manager;
151    in (canceled, make_state (requests', status, manager')) end);
152
153val future = Thread_Attributes.uninterruptible (fn _ => fn time =>
154  let
155    val req: request Single_Assignment.var = Single_Assignment.var "request"
156    fun abort () = ignore (cancel (Single_Assignment.await req))
157    val promise: unit Future.future = Future.promise_name "event_timer" abort
158    val _ = Single_Assignment.assign req (request time (Future.fulfill promise))
159  in
160    promise
161  end);
162
163end;
164