1(* Title: Pure/PIDE/execution.ML 2 Author: Makarius 3 4Global management of execution. Unique running execution serves as 5barrier for further exploration of forked command execs. 6*) 7 8signature EXECUTION = 9sig 10 val start: unit -> Document_ID.execution 11 val discontinue: unit -> unit 12 val is_running: Document_ID.execution -> bool 13 val active_tasks: string -> Future.task list 14 val worker_task_active: bool -> string -> unit 15 val is_running_exec: Document_ID.exec -> bool 16 val running: Document_ID.execution -> Document_ID.exec -> Future.group list -> bool 17 val snapshot: Document_ID.exec list -> Future.task list 18 val join: Document_ID.exec list -> unit 19 val peek: Document_ID.exec -> Future.group list 20 val cancel: Document_ID.exec -> unit 21 type params = {name: string, pos: Position.T, pri: int} 22 val fork: params -> (unit -> 'a) -> 'a future 23 val print: params -> (unit -> unit) -> unit 24 val fork_prints: Document_ID.exec -> unit 25 val purge: Document_ID.exec list -> unit 26 val reset: unit -> Future.group list 27 val shutdown: unit -> unit 28end; 29 30structure Execution: EXECUTION = 31struct 32 33(* global state *) 34 35type print = {name: string, pri: int, body: unit -> unit}; 36type execs = (Future.group list * print list) (*active forks, prints*) Inttab.table; 37val init_execs: execs = Inttab.make [(Document_ID.none, ([], []))]; 38 39datatype state = 40 State of 41 {execution_id: Document_ID.execution, (*overall document execution*) 42 nodes: Future.task list Symtab.table, (*active nodes*) 43 execs: execs}; (*running command execs*) 44 45fun make_state (execution_id, nodes, execs) = 46 State {execution_id = execution_id, nodes = nodes, execs = execs}; 47 48local 49 val state = 50 Synchronized.var "Execution.state" (make_state (Document_ID.none, Symtab.empty, init_execs)); 51in 52 53fun get_state () = let val State args = Synchronized.value state in args end; 54 55fun change_state_result f = 56 Synchronized.change_result state (fn (State {execution_id, nodes, execs}) => 57 let val (result, args') = f (execution_id, nodes, execs) 58 in (result, make_state args') end); 59 60fun change_state f = 61 Synchronized.change state (fn (State {execution_id, nodes, execs}) => 62 make_state (f (execution_id, nodes, execs))); 63 64end; 65 66fun unregistered exec_id = "Unregistered execution: " ^ Document_ID.print exec_id; 67 68 69(* unique running execution *) 70 71fun start () = 72 let 73 val execution_id = Document_ID.make (); 74 val _ = change_state (fn (_, nodes, execs) => (execution_id, nodes, execs)); 75 in execution_id end; 76 77fun discontinue () = 78 change_state (fn (_, nodes, execs) => (Document_ID.none, nodes, execs)); 79 80fun is_running execution_id = 81 execution_id = #execution_id (get_state ()); 82 83 84(* active nodes *) 85 86fun active_tasks node_name = 87 Symtab.lookup_list (#nodes (get_state ())) node_name; 88 89fun worker_task_active insert node_name = 90 change_state (fn (execution_id, nodes, execs) => 91 let 92 val nodes' = nodes 93 |> (if insert then Symtab.insert_list else Symtab.remove_list) 94 Task_Queue.eq_task (node_name, the (Future.worker_task ())); 95 in (execution_id, nodes', execs) end); 96 97 98(* running execs *) 99 100fun is_running_exec exec_id = 101 Inttab.defined (#execs (get_state ())) exec_id; 102 103fun running execution_id exec_id groups = 104 change_state_result (fn (execution_id', nodes, execs) => 105 let 106 val ok = execution_id = execution_id' andalso not (Inttab.defined execs exec_id); 107 val execs' = execs |> ok ? Inttab.update (exec_id, (groups, [])); 108 in (ok, (execution_id', nodes, execs')) end); 109 110 111(* exec groups and tasks *) 112 113fun exec_groups (execs: execs) exec_id = 114 (case Inttab.lookup execs exec_id of 115 SOME (groups, _) => groups 116 | NONE => []); 117 118fun snapshot [] = [] 119 | snapshot exec_ids = 120 change_state_result 121 (`(fn (_, _, execs) => Future.snapshot (maps (exec_groups execs) exec_ids))); 122 123fun join exec_ids = 124 (case snapshot exec_ids of 125 [] => () 126 | tasks => 127 ((singleton o Future.forks) 128 {name = "Execution.join", group = SOME (Future.new_group NONE), 129 deps = tasks, pri = 0, interrupts = false} I 130 |> Future.join; join exec_ids)); 131 132fun peek exec_id = exec_groups (#execs (get_state ())) exec_id; 133 134fun cancel exec_id = List.app Future.cancel_group (peek exec_id); 135 136 137(* fork *) 138 139fun status task markups = 140 let 141 val props = 142 if ! Multithreading.trace >= 2 143 then [(Markup.taskN, Task_Queue.str_of_task task)] else []; 144 in Output.status (map (Markup.markup_only o Markup.properties props) markups) end; 145 146type params = {name: string, pos: Position.T, pri: int}; 147 148fun fork ({name, pos, pri}: params) e = 149 Thread_Attributes.uninterruptible (fn _ => Position.setmp_thread_data pos (fn () => 150 let 151 val exec_id = the_default 0 (Position.parse_id pos); 152 val group = Future.worker_subgroup (); 153 val _ = change_state (fn (execution_id, nodes, execs) => 154 (case Inttab.lookup execs exec_id of 155 SOME (groups, prints) => 156 let val execs' = Inttab.update (exec_id, (group :: groups, prints)) execs 157 in (execution_id, nodes, execs') end 158 | NONE => raise Fail (unregistered exec_id))); 159 160 val future = 161 (singleton o Future.forks) 162 {name = name, group = SOME group, deps = [], pri = pri, interrupts = false} 163 (fn () => 164 let 165 val task = the (Future.worker_task ()); 166 val _ = status task [Markup.running]; 167 val result = 168 Exn.capture (Future.interruptible_task e) () 169 |> Future.identify_result pos 170 |> Exn.map_exn Runtime.thread_context; 171 val errors = 172 Exn.capture (fn () => 173 (case result of 174 Exn.Exn exn => 175 (status task [Markup.failed]; 176 status task [Markup.finished]; 177 Output.report [Markup.markup_only (Markup.bad ())]; 178 if exec_id = 0 then () 179 else List.app (Future.error_message pos) (Runtime.exn_messages exn)) 180 | Exn.Res _ => 181 status task [Markup.finished])) (); 182 val _ = status task [Markup.joined]; 183 in Exn.release errors; Exn.release result end); 184 185 val _ = status (Future.task_of future) [Markup.forked]; 186 in future end)) (); 187 188 189(* print *) 190 191fun print ({name, pos, pri}: params) e = 192 change_state (fn (execution_id, nodes, execs) => 193 let 194 val exec_id = the_default 0 (Position.parse_id pos); 195 val print = {name = name, pri = pri, body = e}; 196 in 197 (case Inttab.lookup execs exec_id of 198 SOME (groups, prints) => 199 let val execs' = Inttab.update (exec_id, (groups, print :: prints)) execs 200 in (execution_id, nodes, execs') end 201 | NONE => raise Fail (unregistered exec_id)) 202 end); 203 204fun fork_prints exec_id = 205 (case Inttab.lookup (#execs (get_state ())) exec_id of 206 SOME (_, prints) => 207 if Future.relevant prints then 208 let val pos = Position.thread_data () in 209 List.app (fn {name, pri, body} => 210 ignore (fork {name = name, pos = pos, pri = pri} body)) (rev prints) 211 end 212 else List.app (fn {body, ...} => body ()) (rev prints) 213 | NONE => raise Fail (unregistered exec_id)); 214 215 216(* cleanup *) 217 218fun purge exec_ids = 219 change_state (fn (execution_id, nodes, execs) => 220 let 221 val execs' = fold Inttab.delete_safe exec_ids execs; 222 val () = 223 (execs', ()) |-> Inttab.fold (fn (exec_id, (groups, _)) => fn () => 224 if Inttab.defined execs' exec_id then () 225 else groups |> List.app (fn group => 226 if Task_Queue.is_canceled group then () 227 else raise Fail ("Attempt to purge valid execution: " ^ Document_ID.print exec_id))); 228 in (execution_id, nodes, execs') end); 229 230fun reset () = 231 change_state_result (fn (_, _, execs) => 232 let val groups = Inttab.fold (append o #1 o #2) execs [] 233 in (groups, (Document_ID.none, Symtab.empty, init_execs)) end); 234 235fun shutdown () = 236 (Future.shutdown (); 237 (case maps Task_Queue.group_status (reset ()) of 238 [] => () 239 | exns => raise Par_Exn.make exns)); 240 241end; 242