1(*  Title:      Pure/Concurrent/task_queue.ML
2    Author:     Makarius
3
4Ordered queue of grouped tasks.
5*)
6
7signature TASK_QUEUE =
8sig
9  type group
10  val new_group: group option -> group
11  val group_id: group -> int
12  val eq_group: group * group -> bool
13  val cancel_group: group -> exn -> unit
14  val is_canceled: group -> bool
15  val group_status: group -> exn list
16  val str_of_group: group -> string
17  val str_of_groups: group -> string
18  val urgent_pri: int
19  type task
20  val dummy_task: task
21  val group_of_task: task -> group
22  val name_of_task: task -> string
23  val pri_of_task: task -> int
24  val str_of_task: task -> string
25  val str_of_task_groups: task -> string
26  val running: task -> (unit -> 'a) -> 'a
27  val joining: task -> (unit -> 'a) -> 'a
28  val waiting: task -> task list -> (unit -> 'a) -> 'a
29  type queue
30  val empty: queue
31  val group_tasks: queue -> group list -> task list
32  val known_task: queue -> task -> bool
33  val all_passive: queue -> bool
34  val status: queue -> {ready: int, pending: int, running: int, passive: int, urgent: int}
35  val cancel: queue -> group -> Thread.thread list
36  val cancel_all: queue -> group list * Thread.thread list
37  val finish: task -> queue -> bool * queue
38  val enroll: Thread.thread -> string -> group -> queue -> task * queue
39  val enqueue_passive: group -> string -> (unit -> bool) -> queue -> task * queue
40  val enqueue: string -> group -> task list -> int -> (bool -> bool) -> queue -> task * queue
41  val extend: task -> (bool -> bool) -> queue -> queue option
42  val dequeue_passive: Thread.thread -> task -> queue -> bool option * queue
43  val dequeue: Thread.thread -> bool -> queue -> (task * (bool -> bool) list) option * queue
44  val dequeue_deps: Thread.thread -> task list -> queue ->
45    (((task * (bool -> bool) list) option * task list) * queue)
46end;
47
48structure Task_Queue: TASK_QUEUE =
49struct
50
51open Portable
52
53val new_id = Counter.make ();
54
55(** nested groups of tasks **)
56
57(* groups *)
58
59abstype group = Group of
60 {parent: group option,
61  id: int,
62  status: exn option Synchronized.var}
63with
64
65fun make_group (parent, id, status) = Group {parent = parent, id = id, status = status};
66
67fun new_group parent = make_group (parent, new_id (), Synchronized.var "group_status" NONE);
68
69fun group_id (Group {id, ...}) = id;
70fun eq_group (group1, group2) = group_id group1 = group_id group2;
71
72fun fold_groups f (g as Group {parent = NONE, ...}) a = f g a
73  | fold_groups f (g as Group {parent = SOME group, ...}) a = fold_groups f group (f g a);
74
75
76(* group status *)
77
78fun cancel_group (Group {status, ...}) exn =
79  Synchronized.change status
80    (fn exns => SOME (Par_Exn.make (exn :: the_list exns)));
81
82fun is_canceled (Group {parent, status, ...}) =
83  isSome (Synchronized.value status) orelse
84  (case parent of NONE => false | SOME group => is_canceled group);
85
86fun group_status (Group {parent, status, ...}) =
87  the_list (Synchronized.value status) @
88    (case parent of NONE => [] | SOME group => group_status group);
89
90fun str_of_group group =
91  (is_canceled group ? enclose "(" ")") (Int.toString (group_id group));
92
93fun str_of_groups group =
94  String.concatWith "/" (map str_of_group (rev (fold_groups cons group [])));
95
96end;
97
98
99(* tasks *)
100
101val urgent_pri = 1000;
102
103type timing = Time.time * Time.time * string list;  (*run, wait, wait dependencies*)
104
105val timing_start = (Time.zeroTime, Time.zeroTime, []): timing;
106
107fun new_timing () =
108  if ! Multithreading.trace < 2 then NONE
109  else SOME (Synchronized.var "timing" timing_start);
110
111abstype task = Task of
112 {group: group,
113  name: string,
114  id: int,
115  pri: int option,
116  timing: timing Synchronized.var option}
117with
118
119val dummy_task =
120  Task {group = new_group NONE, name = "", id = 0, pri = NONE, timing = NONE};
121
122fun new_task group name pri =
123  Task {group = group, name = name, id = new_id (), pri = pri, timing = new_timing ()};
124
125fun group_of_task (Task {group, ...}) = group;
126fun name_of_task (Task {name, ...}) = name;
127fun pri_of_task (Task {pri, ...}) = the_default 0 pri;
128
129fun str_of_task (Task {name, id, ...}) =
130  if name = "" then Int.toString id else Int.toString id ^ " (" ^ name ^ ")";
131
132fun str_of_task_groups task = str_of_task task ^ " in " ^ str_of_groups (group_of_task task);
133
134fun update_timing update (Task {timing, ...}) e =
135  Thread_Attributes.uninterruptible (fn restore_attributes => fn () =>
136    let
137      val start = Time.now ();
138      val result = Exn.capture (restore_attributes e) ();
139      val t = Time.now () - start;
140      val _ = (case timing of NONE => () | SOME var => Synchronized.change var (update t));
141    in Exn.release result end) ();
142
143fun task_ord (Task{id=id1, pri=pri1, ...},Task{id = id2, pri = pri2, ...}) =
144    pair_compare (flip_cmp (option_compare Int.compare), Int.compare)
145                 ((pri1, id1), (pri2, id2))
146end;
147
148structure TaskKEY = struct
149  type key = task
150  val ord = task_ord
151  fun pp t = PolyML.prettyRepresentation(t,~1)
152end
153structure Tasks = Table(TaskKEY);
154structure Task_Graph = Graph(TaskKEY);
155
156
157(* timing *)
158
159fun running task =
160  update_timing (fn t => fn (a, b, ds) => (a + t, b, ds)) task;
161
162fun joining task =
163  update_timing (fn t => fn (a, b, ds) => (a - t, b, ds)) task;
164
165fun waiting task deps =
166  update_timing (fn t => fn (a, b, ds) =>
167    (a - t, b + t,
168      if ! Multithreading.trace > 0
169      then foldl' (op_insert equal o name_of_task) deps ds else ds)) task;
170
171
172
173(** queue of jobs and groups **)
174
175(* known group members *)
176
177type groups = unit Tasks.table Inttab.table;
178
179fun get_tasks (groups: groups) gid =
180  the_default Tasks.empty (Inttab.lookup groups gid);
181
182fun add_task (gid, task) groups =
183  Inttab.update (gid, Tasks.update (task, ()) (get_tasks groups gid)) groups;
184
185fun del_task (gid, task) groups =
186  let val tasks = Tasks.delete_safe task (get_tasks groups gid) in
187    if Tasks.is_empty tasks then Inttab.delete_safe gid groups
188    else Inttab.update (gid, tasks) groups
189  end;
190
191
192(* job dependency graph *)
193
194datatype job =
195  Job of (bool -> bool) list |
196  Running of Thread.thread |
197  Passive of unit -> bool;
198
199type jobs = job Task_Graph.T;
200
201fun get_job (jobs: jobs) task = Task_Graph.get_node jobs task;
202fun set_job task job (jobs: jobs) = Task_Graph.map_node task (K job) jobs;
203
204fun add_job task dep (jobs: jobs) =
205  Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
206
207
208(* queue *)
209
210datatype queue = Queue of {groups: groups, jobs: jobs, urgent: int};
211
212fun make_queue groups jobs urgent = Queue {groups = groups, jobs = jobs, urgent = urgent};
213val empty = make_queue Inttab.empty Task_Graph.empty 0;
214
215fun group_tasks (Queue {groups, ...}) gs =
216  foldl' (fn g => fn tasks =>
217             Tasks.merge equal (tasks, get_tasks groups (group_id g)))
218         gs Tasks.empty
219         |> Tasks.keys;
220
221fun known_task (Queue {jobs, ...}) task = can (Task_Graph.get_entry jobs) task;
222
223
224(* job status *)
225
226fun ready_job (task, (Job list, (deps, _))) =
227      if Task_Graph.Keys.is_empty deps then SOME (task, rev list) else NONE
228  | ready_job (task, (Passive abort, (deps, _))) =
229      if Task_Graph.Keys.is_empty deps andalso is_canceled (group_of_task task)
230      then SOME (task, [fn _ => abort ()])
231      else NONE
232  | ready_job _ = NONE;
233
234fun ready_job_urgent false = ready_job
235  | ready_job_urgent true = (fn entry as (task, _) =>
236      if pri_of_task task >= urgent_pri then ready_job entry else NONE);
237
238fun active_job (task, (Running _, _)) = SOME (task, [])
239  | active_job arg = ready_job arg;
240
241fun all_passive (Queue {jobs, ...}) =
242  not (isSome (Task_Graph.get_first active_job jobs))
243
244
245(* queue status *)
246
247fun status (Queue {jobs, urgent, ...}) =
248  let
249    val (x, y, z, w) =
250      Task_Graph.fold (fn (_, (job, (deps, _))) => fn (x, y, z, w) =>
251          (case job of
252            Job _ => if Task_Graph.Keys.is_empty deps then (x + 1, y, z, w) else (x, y + 1, z, w)
253          | Running _ => (x, y, z + 1, w)
254          | Passive _ => (x, y, z, w + 1)))
255        jobs (0, 0, 0, 0);
256  in {ready = x, pending = y, running = z, passive = w, urgent = urgent} end;
257
258
259
260(** task queue operations **)
261
262(* cancel -- peers and sub-groups *)
263
264fun cancel (Queue {groups, jobs, ...}) group =
265  let
266    val _ = cancel_group group Exn.Interrupt;
267    val running =
268      Tasks.fold
269        (fn (task, _) => (
270           case get_job jobs task of
271               Running thread => op_insert (curry Thread.equal) thread
272             | _ => I)
273        )
274        (get_tasks groups (group_id group)) []
275  in running end;
276
277fun cancel_all (Queue {jobs, ...}) =
278  let
279    fun cancel_job (task, (job, _)) (groups, running) =
280      let
281        val group = group_of_task task;
282        val _ = cancel_group group Exn.Interrupt;
283      in
284        case job of
285            Running t => (op_insert (curry eq_group) group groups,
286                          op_insert (curry Thread.equal) t running)
287         | _ => (groups, running)
288      end;
289    val running = Task_Graph.fold cancel_job jobs ([], []);
290  in running end;
291
292
293(* finish *)
294
295fun finish task (Queue {groups, jobs, urgent}) =
296  let
297    val group = group_of_task task;
298    val groups' = fold_groups (fn g => del_task (group_id g, task)) group groups;
299    val jobs' = Task_Graph.del_node task jobs;
300    val maximal = Task_Graph.is_maximal jobs task;
301  in (maximal, make_queue groups' jobs' urgent) end;
302
303
304(* enroll *)
305
306fun enroll thread name group (Queue {groups, jobs, urgent}) =
307  let
308    val task = new_task group name NONE;
309    val groups' = fold_groups (fn g => add_task (group_id g, task)) group groups;
310    val jobs' = jobs |> Task_Graph.new_node (task, Running thread);
311  in (task, make_queue groups' jobs' urgent) end;
312
313
314(* enqueue *)
315
316fun enqueue_passive group name abort (Queue {groups, jobs, urgent}) =
317  let
318    val task = new_task group name NONE;
319    val groups' = fold_groups (fn g => add_task (group_id g, task)) group groups;
320    val jobs' = jobs |> Task_Graph.new_node (task, Passive abort);
321  in (task, make_queue groups' jobs' urgent) end;
322
323fun enqueue name group deps pri job (Queue {groups, jobs, urgent}) =
324  let
325    val task = new_task group name (SOME pri);
326    val groups' = fold_groups (fn g => add_task (group_id g, task)) group groups;
327    val jobs' = jobs
328      |> Task_Graph.new_node (task, Job [job])
329      |> foldl' (add_job task) deps;
330    val urgent' = if pri >= urgent_pri then urgent + 1 else urgent;
331  in (task, make_queue groups' jobs' urgent') end;
332
333fun extend task job (Queue {groups, jobs, urgent}) =
334  case total (get_job jobs) task of
335    SOME (Job list) =>
336      SOME (make_queue groups (set_job task (Job (job :: list)) jobs) urgent)
337   | _ => NONE
338
339
340(* dequeue *)
341
342fun dequeue_passive thread task (queue as Queue {groups, jobs, urgent}) =
343  case total (get_job jobs) task of
344    SOME (Passive _) =>
345      let val jobs' = set_job task (Running thread) jobs
346      in (SOME true, make_queue groups jobs' urgent) end
347  | SOME _ => (SOME false, queue)
348  | NONE => (NONE, queue)
349
350fun dequeue thread urgent_only (queue as Queue {groups, jobs, urgent}) =
351  if not urgent_only orelse urgent > 0 then
352    (case Task_Graph.get_first (ready_job_urgent urgent_only) jobs of
353      SOME (result as (task, _)) =>
354        let
355          val jobs' = set_job task (Running thread) jobs;
356          val urgent' = if pri_of_task task >= urgent_pri then urgent - 1 else urgent;
357        in (SOME result, make_queue groups jobs' urgent') end
358    | NONE => (NONE, queue))
359  else (NONE, queue);
360
361
362(* dequeue wrt. dynamic dependencies *)
363
364fun dequeue_deps thread deps (queue as Queue {groups, jobs, urgent}) =
365  let
366    fun ready [] rest = (NONE, rev rest)
367      | ready (task :: tasks) rest =
368          (case total (Task_Graph.get_entry jobs) task of
369            NONE => ready tasks rest
370          | SOME (_, entry) =>
371              (case ready_job (task, entry) of
372                NONE => ready tasks (task :: rest)
373              | some => (some, foldl' cons rest tasks)));
374
375    fun ready_dep _ [] = NONE
376      | ready_dep seen (task :: tasks) =
377          if Tasks.defined seen task then ready_dep seen tasks
378          else
379            let val entry as (_, (ds, _)) = #2 (Task_Graph.get_entry jobs task) in
380              (case ready_job (task, entry) of
381                NONE => ready_dep (Tasks.update (task, ()) seen) (Task_Graph.Keys.dest ds @ tasks)
382              | some => some)
383            end;
384
385    fun result (res as (task, _)) deps' =
386      let
387        val jobs' = set_job task (Running thread) jobs;
388        val urgent' = if pri_of_task task >= urgent_pri then urgent - 1 else urgent;
389      in ((SOME res, deps'), make_queue groups jobs' urgent') end;
390  in
391    (case ready deps [] of
392      (SOME res, deps') => result res deps'
393    | (NONE, deps') =>
394        (case ready_dep Tasks.empty deps' of
395          SOME res => result res deps'
396        | NONE => ((NONE, deps'), queue)))
397  end;
398
399end;
400