1(* Title: Pure/Concurrent/task_queue.ML 2 Author: Makarius 3 4Ordered queue of grouped tasks. 5*) 6 7signature TASK_QUEUE = 8sig 9 type group 10 val new_group: group option -> group 11 val group_id: group -> int 12 val eq_group: group * group -> bool 13 val cancel_group: group -> exn -> unit 14 val is_canceled: group -> bool 15 val group_status: group -> exn list 16 val str_of_group: group -> string 17 val str_of_groups: group -> string 18 val urgent_pri: int 19 type task 20 val dummy_task: task 21 val group_of_task: task -> group 22 val name_of_task: task -> string 23 val pri_of_task: task -> int 24 val str_of_task: task -> string 25 val str_of_task_groups: task -> string 26 val running: task -> (unit -> 'a) -> 'a 27 val joining: task -> (unit -> 'a) -> 'a 28 val waiting: task -> task list -> (unit -> 'a) -> 'a 29 type queue 30 val empty: queue 31 val group_tasks: queue -> group list -> task list 32 val known_task: queue -> task -> bool 33 val all_passive: queue -> bool 34 val status: queue -> {ready: int, pending: int, running: int, passive: int, urgent: int} 35 val cancel: queue -> group -> Thread.thread list 36 val cancel_all: queue -> group list * Thread.thread list 37 val finish: task -> queue -> bool * queue 38 val enroll: Thread.thread -> string -> group -> queue -> task * queue 39 val enqueue_passive: group -> string -> (unit -> bool) -> queue -> task * queue 40 val enqueue: string -> group -> task list -> int -> (bool -> bool) -> queue -> task * queue 41 val extend: task -> (bool -> bool) -> queue -> queue option 42 val dequeue_passive: Thread.thread -> task -> queue -> bool option * queue 43 val dequeue: Thread.thread -> bool -> queue -> (task * (bool -> bool) list) option * queue 44 val dequeue_deps: Thread.thread -> task list -> queue -> 45 (((task * (bool -> bool) list) option * task list) * queue) 46end; 47 48structure Task_Queue: TASK_QUEUE = 49struct 50 51open Portable 52 53val new_id = Counter.make (); 54 55(** nested groups of tasks **) 56 57(* groups *) 58 59abstype group = Group of 60 {parent: group option, 61 id: int, 62 status: exn option Synchronized.var} 63with 64 65fun make_group (parent, id, status) = Group {parent = parent, id = id, status = status}; 66 67fun new_group parent = make_group (parent, new_id (), Synchronized.var "group_status" NONE); 68 69fun group_id (Group {id, ...}) = id; 70fun eq_group (group1, group2) = group_id group1 = group_id group2; 71 72fun fold_groups f (g as Group {parent = NONE, ...}) a = f g a 73 | fold_groups f (g as Group {parent = SOME group, ...}) a = fold_groups f group (f g a); 74 75 76(* group status *) 77 78fun cancel_group (Group {status, ...}) exn = 79 Synchronized.change status 80 (fn exns => SOME (Par_Exn.make (exn :: the_list exns))); 81 82fun is_canceled (Group {parent, status, ...}) = 83 isSome (Synchronized.value status) orelse 84 (case parent of NONE => false | SOME group => is_canceled group); 85 86fun group_status (Group {parent, status, ...}) = 87 the_list (Synchronized.value status) @ 88 (case parent of NONE => [] | SOME group => group_status group); 89 90fun str_of_group group = 91 (is_canceled group ? enclose "(" ")") (Int.toString (group_id group)); 92 93fun str_of_groups group = 94 String.concatWith "/" (map str_of_group (rev (fold_groups cons group []))); 95 96end; 97 98 99(* tasks *) 100 101val urgent_pri = 1000; 102 103type timing = Time.time * Time.time * string list; (*run, wait, wait dependencies*) 104 105val timing_start = (Time.zeroTime, Time.zeroTime, []): timing; 106 107fun new_timing () = 108 if ! Multithreading.trace < 2 then NONE 109 else SOME (Synchronized.var "timing" timing_start); 110 111abstype task = Task of 112 {group: group, 113 name: string, 114 id: int, 115 pri: int option, 116 timing: timing Synchronized.var option} 117with 118 119val dummy_task = 120 Task {group = new_group NONE, name = "", id = 0, pri = NONE, timing = NONE}; 121 122fun new_task group name pri = 123 Task {group = group, name = name, id = new_id (), pri = pri, timing = new_timing ()}; 124 125fun group_of_task (Task {group, ...}) = group; 126fun name_of_task (Task {name, ...}) = name; 127fun pri_of_task (Task {pri, ...}) = the_default 0 pri; 128 129fun str_of_task (Task {name, id, ...}) = 130 if name = "" then Int.toString id else Int.toString id ^ " (" ^ name ^ ")"; 131 132fun str_of_task_groups task = str_of_task task ^ " in " ^ str_of_groups (group_of_task task); 133 134fun update_timing update (Task {timing, ...}) e = 135 Thread_Attributes.uninterruptible (fn restore_attributes => fn () => 136 let 137 val start = Time.now (); 138 val result = Exn.capture (restore_attributes e) (); 139 val t = Time.now () - start; 140 val _ = (case timing of NONE => () | SOME var => Synchronized.change var (update t)); 141 in Exn.release result end) (); 142 143fun task_ord (Task{id=id1, pri=pri1, ...},Task{id = id2, pri = pri2, ...}) = 144 pair_compare (flip_cmp (option_compare Int.compare), Int.compare) 145 ((pri1, id1), (pri2, id2)) 146end; 147 148structure TaskKEY = struct 149 type key = task 150 val ord = task_ord 151 fun pp t = PolyML.prettyRepresentation(t,~1) 152end 153structure Tasks = Table(TaskKEY); 154structure Task_Graph = Graph(TaskKEY); 155 156 157(* timing *) 158 159fun running task = 160 update_timing (fn t => fn (a, b, ds) => (a + t, b, ds)) task; 161 162fun joining task = 163 update_timing (fn t => fn (a, b, ds) => (a - t, b, ds)) task; 164 165fun waiting task deps = 166 update_timing (fn t => fn (a, b, ds) => 167 (a - t, b + t, 168 if ! Multithreading.trace > 0 169 then foldl' (op_insert equal o name_of_task) deps ds else ds)) task; 170 171 172 173(** queue of jobs and groups **) 174 175(* known group members *) 176 177type groups = unit Tasks.table Inttab.table; 178 179fun get_tasks (groups: groups) gid = 180 the_default Tasks.empty (Inttab.lookup groups gid); 181 182fun add_task (gid, task) groups = 183 Inttab.update (gid, Tasks.update (task, ()) (get_tasks groups gid)) groups; 184 185fun del_task (gid, task) groups = 186 let val tasks = Tasks.delete_safe task (get_tasks groups gid) in 187 if Tasks.is_empty tasks then Inttab.delete_safe gid groups 188 else Inttab.update (gid, tasks) groups 189 end; 190 191 192(* job dependency graph *) 193 194datatype job = 195 Job of (bool -> bool) list | 196 Running of Thread.thread | 197 Passive of unit -> bool; 198 199type jobs = job Task_Graph.T; 200 201fun get_job (jobs: jobs) task = Task_Graph.get_node jobs task; 202fun set_job task job (jobs: jobs) = Task_Graph.map_node task (K job) jobs; 203 204fun add_job task dep (jobs: jobs) = 205 Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs; 206 207 208(* queue *) 209 210datatype queue = Queue of {groups: groups, jobs: jobs, urgent: int}; 211 212fun make_queue groups jobs urgent = Queue {groups = groups, jobs = jobs, urgent = urgent}; 213val empty = make_queue Inttab.empty Task_Graph.empty 0; 214 215fun group_tasks (Queue {groups, ...}) gs = 216 foldl' (fn g => fn tasks => 217 Tasks.merge equal (tasks, get_tasks groups (group_id g))) 218 gs Tasks.empty 219 |> Tasks.keys; 220 221fun known_task (Queue {jobs, ...}) task = can (Task_Graph.get_entry jobs) task; 222 223 224(* job status *) 225 226fun ready_job (task, (Job list, (deps, _))) = 227 if Task_Graph.Keys.is_empty deps then SOME (task, rev list) else NONE 228 | ready_job (task, (Passive abort, (deps, _))) = 229 if Task_Graph.Keys.is_empty deps andalso is_canceled (group_of_task task) 230 then SOME (task, [fn _ => abort ()]) 231 else NONE 232 | ready_job _ = NONE; 233 234fun ready_job_urgent false = ready_job 235 | ready_job_urgent true = (fn entry as (task, _) => 236 if pri_of_task task >= urgent_pri then ready_job entry else NONE); 237 238fun active_job (task, (Running _, _)) = SOME (task, []) 239 | active_job arg = ready_job arg; 240 241fun all_passive (Queue {jobs, ...}) = 242 not (isSome (Task_Graph.get_first active_job jobs)) 243 244 245(* queue status *) 246 247fun status (Queue {jobs, urgent, ...}) = 248 let 249 val (x, y, z, w) = 250 Task_Graph.fold (fn (_, (job, (deps, _))) => fn (x, y, z, w) => 251 (case job of 252 Job _ => if Task_Graph.Keys.is_empty deps then (x + 1, y, z, w) else (x, y + 1, z, w) 253 | Running _ => (x, y, z + 1, w) 254 | Passive _ => (x, y, z, w + 1))) 255 jobs (0, 0, 0, 0); 256 in {ready = x, pending = y, running = z, passive = w, urgent = urgent} end; 257 258 259 260(** task queue operations **) 261 262(* cancel -- peers and sub-groups *) 263 264fun cancel (Queue {groups, jobs, ...}) group = 265 let 266 val _ = cancel_group group Exn.Interrupt; 267 val running = 268 Tasks.fold 269 (fn (task, _) => ( 270 case get_job jobs task of 271 Running thread => op_insert (curry Thread.equal) thread 272 | _ => I) 273 ) 274 (get_tasks groups (group_id group)) [] 275 in running end; 276 277fun cancel_all (Queue {jobs, ...}) = 278 let 279 fun cancel_job (task, (job, _)) (groups, running) = 280 let 281 val group = group_of_task task; 282 val _ = cancel_group group Exn.Interrupt; 283 in 284 case job of 285 Running t => (op_insert (curry eq_group) group groups, 286 op_insert (curry Thread.equal) t running) 287 | _ => (groups, running) 288 end; 289 val running = Task_Graph.fold cancel_job jobs ([], []); 290 in running end; 291 292 293(* finish *) 294 295fun finish task (Queue {groups, jobs, urgent}) = 296 let 297 val group = group_of_task task; 298 val groups' = fold_groups (fn g => del_task (group_id g, task)) group groups; 299 val jobs' = Task_Graph.del_node task jobs; 300 val maximal = Task_Graph.is_maximal jobs task; 301 in (maximal, make_queue groups' jobs' urgent) end; 302 303 304(* enroll *) 305 306fun enroll thread name group (Queue {groups, jobs, urgent}) = 307 let 308 val task = new_task group name NONE; 309 val groups' = fold_groups (fn g => add_task (group_id g, task)) group groups; 310 val jobs' = jobs |> Task_Graph.new_node (task, Running thread); 311 in (task, make_queue groups' jobs' urgent) end; 312 313 314(* enqueue *) 315 316fun enqueue_passive group name abort (Queue {groups, jobs, urgent}) = 317 let 318 val task = new_task group name NONE; 319 val groups' = fold_groups (fn g => add_task (group_id g, task)) group groups; 320 val jobs' = jobs |> Task_Graph.new_node (task, Passive abort); 321 in (task, make_queue groups' jobs' urgent) end; 322 323fun enqueue name group deps pri job (Queue {groups, jobs, urgent}) = 324 let 325 val task = new_task group name (SOME pri); 326 val groups' = fold_groups (fn g => add_task (group_id g, task)) group groups; 327 val jobs' = jobs 328 |> Task_Graph.new_node (task, Job [job]) 329 |> foldl' (add_job task) deps; 330 val urgent' = if pri >= urgent_pri then urgent + 1 else urgent; 331 in (task, make_queue groups' jobs' urgent') end; 332 333fun extend task job (Queue {groups, jobs, urgent}) = 334 case total (get_job jobs) task of 335 SOME (Job list) => 336 SOME (make_queue groups (set_job task (Job (job :: list)) jobs) urgent) 337 | _ => NONE 338 339 340(* dequeue *) 341 342fun dequeue_passive thread task (queue as Queue {groups, jobs, urgent}) = 343 case total (get_job jobs) task of 344 SOME (Passive _) => 345 let val jobs' = set_job task (Running thread) jobs 346 in (SOME true, make_queue groups jobs' urgent) end 347 | SOME _ => (SOME false, queue) 348 | NONE => (NONE, queue) 349 350fun dequeue thread urgent_only (queue as Queue {groups, jobs, urgent}) = 351 if not urgent_only orelse urgent > 0 then 352 (case Task_Graph.get_first (ready_job_urgent urgent_only) jobs of 353 SOME (result as (task, _)) => 354 let 355 val jobs' = set_job task (Running thread) jobs; 356 val urgent' = if pri_of_task task >= urgent_pri then urgent - 1 else urgent; 357 in (SOME result, make_queue groups jobs' urgent') end 358 | NONE => (NONE, queue)) 359 else (NONE, queue); 360 361 362(* dequeue wrt. dynamic dependencies *) 363 364fun dequeue_deps thread deps (queue as Queue {groups, jobs, urgent}) = 365 let 366 fun ready [] rest = (NONE, rev rest) 367 | ready (task :: tasks) rest = 368 (case total (Task_Graph.get_entry jobs) task of 369 NONE => ready tasks rest 370 | SOME (_, entry) => 371 (case ready_job (task, entry) of 372 NONE => ready tasks (task :: rest) 373 | some => (some, foldl' cons rest tasks))); 374 375 fun ready_dep _ [] = NONE 376 | ready_dep seen (task :: tasks) = 377 if Tasks.defined seen task then ready_dep seen tasks 378 else 379 let val entry as (_, (ds, _)) = #2 (Task_Graph.get_entry jobs task) in 380 (case ready_job (task, entry) of 381 NONE => ready_dep (Tasks.update (task, ()) seen) (Task_Graph.Keys.dest ds @ tasks) 382 | some => some) 383 end; 384 385 fun result (res as (task, _)) deps' = 386 let 387 val jobs' = set_job task (Running thread) jobs; 388 val urgent' = if pri_of_task task >= urgent_pri then urgent - 1 else urgent; 389 in ((SOME res, deps'), make_queue groups jobs' urgent') end; 390 in 391 (case ready deps [] of 392 (SOME res, deps') => result res deps' 393 | (NONE, deps') => 394 (case ready_dep Tasks.empty deps' of 395 SOME res => result res deps' 396 | NONE => ((NONE, deps'), queue))) 397 end; 398 399end; 400