1(*  Title:      Pure/PIDE/execution.ML
2    Author:     Makarius
3
4Global management of execution.  Unique running execution serves as
5barrier for further exploration of forked command execs.
6*)
7
8signature EXECUTION =
9sig
10  val start: unit -> Document_ID.execution
11  val discontinue: unit -> unit
12  val is_running: Document_ID.execution -> bool
13  val active_tasks: string -> Future.task list
14  val worker_task_active: bool -> string -> unit
15  val is_running_exec: Document_ID.exec -> bool
16  val running: Document_ID.execution -> Document_ID.exec -> Future.group list -> bool
17  val snapshot: Document_ID.exec list -> Future.task list
18  val join: Document_ID.exec list -> unit
19  val peek: Document_ID.exec -> Future.group list
20  val cancel: Document_ID.exec -> unit
21  type params = {name: string, pos: Position.T, pri: int}
22  val fork: params -> (unit -> 'a) -> 'a future
23  val print: params -> (unit -> unit) -> unit
24  val fork_prints: Document_ID.exec -> unit
25  val purge: Document_ID.exec list -> unit
26  val reset: unit -> Future.group list
27  val shutdown: unit -> unit
28end;
29
30structure Execution: EXECUTION =
31struct
32
33(* global state *)
34
35type print = {name: string, pri: int, body: unit -> unit};
36type execs = (Future.group list * print list) (*active forks, prints*) Inttab.table;
37val init_execs: execs = Inttab.make [(Document_ID.none, ([], []))];
38
39datatype state =
40  State of
41   {execution_id: Document_ID.execution,  (*overall document execution*)
42    nodes: Future.task list Symtab.table,  (*active nodes*)
43    execs: execs};  (*running command execs*)
44
45fun make_state (execution_id, nodes, execs) =
46  State {execution_id = execution_id, nodes = nodes, execs = execs};
47
48local
49  val state =
50    Synchronized.var "Execution.state" (make_state (Document_ID.none, Symtab.empty, init_execs));
51in
52
53fun get_state () = let val State args = Synchronized.value state in args end;
54
55fun change_state_result f =
56  Synchronized.change_result state (fn (State {execution_id, nodes, execs}) =>
57    let val (result, args') = f (execution_id, nodes, execs)
58    in (result, make_state args') end);
59
60fun change_state f =
61  Synchronized.change state (fn (State {execution_id, nodes, execs}) =>
62    make_state (f (execution_id, nodes, execs)));
63
64end;
65
66fun unregistered exec_id = "Unregistered execution: " ^ Document_ID.print exec_id;
67
68
69(* unique running execution *)
70
71fun start () =
72  let
73    val execution_id = Document_ID.make ();
74    val _ = change_state (fn (_, nodes, execs) => (execution_id, nodes, execs));
75  in execution_id end;
76
77fun discontinue () =
78  change_state (fn (_, nodes, execs) => (Document_ID.none, nodes, execs));
79
80fun is_running execution_id =
81  execution_id = #execution_id (get_state ());
82
83
84(* active nodes *)
85
86fun active_tasks node_name =
87  Symtab.lookup_list (#nodes (get_state ())) node_name;
88
89fun worker_task_active insert node_name =
90  change_state (fn (execution_id, nodes, execs) =>
91    let
92      val nodes' = nodes
93        |> (if insert then Symtab.insert_list else Symtab.remove_list)
94          Task_Queue.eq_task (node_name, the (Future.worker_task ()));
95    in (execution_id, nodes', execs) end);
96
97
98(* running execs *)
99
100fun is_running_exec exec_id =
101  Inttab.defined (#execs (get_state ())) exec_id;
102
103fun running execution_id exec_id groups =
104  change_state_result (fn (execution_id', nodes, execs) =>
105    let
106      val ok = execution_id = execution_id' andalso not (Inttab.defined execs exec_id);
107      val execs' = execs |> ok ? Inttab.update (exec_id, (groups, []));
108    in (ok, (execution_id', nodes, execs')) end);
109
110
111(* exec groups and tasks *)
112
113fun exec_groups (execs: execs) exec_id =
114  (case Inttab.lookup execs exec_id of
115    SOME (groups, _) => groups
116  | NONE => []);
117
118fun snapshot [] = []
119  | snapshot exec_ids =
120      change_state_result
121        (`(fn (_, _, execs) => Future.snapshot (maps (exec_groups execs) exec_ids)));
122
123fun join exec_ids =
124  (case snapshot exec_ids of
125    [] => ()
126  | tasks =>
127      ((singleton o Future.forks)
128        {name = "Execution.join", group = SOME (Future.new_group NONE),
129          deps = tasks, pri = 0, interrupts = false} I
130      |> Future.join; join exec_ids));
131
132fun peek exec_id = exec_groups (#execs (get_state ())) exec_id;
133
134fun cancel exec_id = List.app Future.cancel_group (peek exec_id);
135
136
137(* fork *)
138
139fun status task markups =
140  let
141    val props =
142      if ! Multithreading.trace >= 2
143      then [(Markup.taskN, Task_Queue.str_of_task task)] else [];
144  in Output.status (map (Markup.markup_only o Markup.properties props) markups) end;
145
146type params = {name: string, pos: Position.T, pri: int};
147
148fun fork ({name, pos, pri}: params) e =
149  Thread_Attributes.uninterruptible (fn _ => Position.setmp_thread_data pos (fn () =>
150    let
151      val exec_id = the_default 0 (Position.parse_id pos);
152      val group = Future.worker_subgroup ();
153      val _ = change_state (fn (execution_id, nodes, execs) =>
154        (case Inttab.lookup execs exec_id of
155          SOME (groups, prints) =>
156            let val execs' = Inttab.update (exec_id, (group :: groups, prints)) execs
157            in (execution_id, nodes, execs') end
158        | NONE => raise Fail (unregistered exec_id)));
159
160      val future =
161        (singleton o Future.forks)
162          {name = name, group = SOME group, deps = [], pri = pri, interrupts = false}
163          (fn () =>
164            let
165              val task = the (Future.worker_task ());
166              val _ = status task [Markup.running];
167              val result =
168                Exn.capture (Future.interruptible_task e) ()
169                |> Future.identify_result pos
170                |> Exn.map_exn Runtime.thread_context;
171              val errors =
172                Exn.capture (fn () =>
173                  (case result of
174                    Exn.Exn exn =>
175                     (status task [Markup.failed];
176                      status task [Markup.finished];
177                      Output.report [Markup.markup_only (Markup.bad ())];
178                      if exec_id = 0 then ()
179                      else List.app (Future.error_message pos) (Runtime.exn_messages exn))
180                  | Exn.Res _ =>
181                      status task [Markup.finished])) ();
182              val _ = status task [Markup.joined];
183            in Exn.release errors; Exn.release result end);
184
185      val _ = status (Future.task_of future) [Markup.forked];
186    in future end)) ();
187
188
189(* print *)
190
191fun print ({name, pos, pri}: params) e =
192  change_state (fn (execution_id, nodes, execs) =>
193    let
194      val exec_id = the_default 0 (Position.parse_id pos);
195      val print = {name = name, pri = pri, body = e};
196    in
197      (case Inttab.lookup execs exec_id of
198        SOME (groups, prints) =>
199          let val execs' = Inttab.update (exec_id, (groups, print :: prints)) execs
200          in (execution_id, nodes, execs') end
201      | NONE => raise Fail (unregistered exec_id))
202    end);
203
204fun fork_prints exec_id =
205  (case Inttab.lookup (#execs (get_state ())) exec_id of
206    SOME (_, prints) =>
207      if Future.relevant prints then
208        let val pos = Position.thread_data () in
209          List.app (fn {name, pri, body} =>
210            ignore (fork {name = name, pos = pos, pri = pri} body)) (rev prints)
211        end
212      else List.app (fn {body, ...} => body ()) (rev prints)
213  | NONE => raise Fail (unregistered exec_id));
214
215
216(* cleanup *)
217
218fun purge exec_ids =
219  change_state (fn (execution_id, nodes, execs) =>
220    let
221      val execs' = fold Inttab.delete_safe exec_ids execs;
222      val () =
223        (execs', ()) |-> Inttab.fold (fn (exec_id, (groups, _)) => fn () =>
224          if Inttab.defined execs' exec_id then ()
225          else groups |> List.app (fn group =>
226            if Task_Queue.is_canceled group then ()
227            else raise Fail ("Attempt to purge valid execution: " ^ Document_ID.print exec_id)));
228    in (execution_id, nodes, execs') end);
229
230fun reset () =
231  change_state_result (fn (_, _, execs) =>
232    let val groups = Inttab.fold (append o #1 o #2) execs []
233    in (groups, (Document_ID.none, Symtab.empty, init_execs)) end);
234
235fun shutdown () =
236  (Future.shutdown ();
237    (case maps Task_Queue.group_status (reset ()) of
238      [] => ()
239    | exns => raise Par_Exn.make exns));
240
241end;
242