1structure ProcessMultiplexor : ProcessMultiplexor =
2struct
3
4  infix |>
5  fun x |> f = f x
6  fun K x y = x
7
8  fun assoc1 k [] = NONE
9    | assoc1 k ((p as (k',v))::rest) = if k = k' then SOME p else assoc1 k rest
10
11  type pid = Posix.ProcEnv.pid
12  val pidToWord  = Posix.Process.pidToWord
13  type exit_status = Posix.Process.exit_status
14
15  type command = {executable: string, nm_args : string list, env : string list}
16  type 'a job = {tag : string, command : command, update : 'a * bool -> 'a}
17  datatype 'a genjob_result =
18           NoMoreJobs of 'a | NewJob of ('a job * 'a) | GiveUpAndDie of 'a
19  type 'a workprovider = { initial : 'a, genjob : 'a -> 'a genjob_result }
20
21  type 'a working_job = {
22    tag : string,
23    command : command,
24    update : 'a * bool -> 'a,
25    starttime : Time.time,
26    lastevent : Time.time,
27    out : TextIO.instream,
28    err : TextIO.instream,
29    outeof : bool,
30    erreof : bool,
31    pid : pid
32  }
33  type jobkey = pid * string
34  datatype strmtype = OUT | ERR
35  datatype monitor_message =
36           Output of jobkey * Time.time * strmtype * string
37         | NothingSeen of jobkey * {delay: Time.time, total_elapsed : Time.time}
38         | Terminated of jobkey * exit_status * Time.time
39         | MonitorKilled of jobkey * Time.time
40         | EOF of jobkey * strmtype * Time.time
41         | StartJob of jobkey
42  datatype client_cmd = Kill of jobkey | KillAll
43  type monitor = monitor_message -> client_cmd option
44
45  local
46    open FunctionalRecordUpdate
47    fun makeUpdateWJ z = makeUpdate10 z (* 10 fields *)
48    fun makeUpdateWL z = makeUpdate4 z (* 4 fields *)
49  in
50    fun updateWJ z = let
51      fun from tag command update starttime lastevent out err
52               outeof erreof pid =
53          {tag = tag, command = command, update = update, starttime = starttime,
54           lastevent = lastevent, out = out, err = err, outeof = outeof,
55           erreof = erreof, pid = pid}
56      fun from' pid erreof outeof err out lastevent starttime update command
57                tag =
58          {tag = tag, command = command, update = update, starttime = starttime,
59           lastevent = lastevent, out = out, err = err, outeof = outeof,
60           erreof = erreof, pid = pid}
61      fun to f {tag, command, update, starttime, lastevent, out,
62                err, outeof, erreof, pid} =
63        f  tag command update starttime lastevent out err
64           outeof erreof pid
65    in
66      makeUpdateWJ (from, from', to)
67    end z
68    fun updateWL z = let
69      fun from current_jobs current_state worklimit genjob =
70        {current_state = current_state, current_jobs = current_jobs,
71         worklimit = worklimit, genjob = genjob}
72      fun from' genjob worklimit current_state current_jobs =
73        {current_state = current_state, current_jobs = current_jobs,
74         worklimit = worklimit, genjob = genjob}
75      fun to f {current_state, current_jobs, worklimit, genjob} =
76        f current_jobs current_state worklimit genjob
77    in
78      makeUpdateWL (from, from', to)
79    end z
80
81    val U = U
82    val $$ = $$
83  end
84
85  fun touch (wj : 'a working_job) : 'a working_job =
86    updateWJ wj (U #lastevent (Time.now())) $$
87
88  fun mkTIO_instream fd =
89  let
90    open Posix.IO
91    val (flags,_) = getfl fd
92    val rdr = mkTextReader { fd = fd, name = "", initBlkMode = not(O.allSet(O.nonblock,flags)) }
93  in
94    TextIO.mkInstream (TextIO.StreamIO.mkInstream (rdr, ""))
95  end
96
97  fun jobkey_compare((p1,s1), (p2,s2)) =
98    case SysWord.compare(pidToWord p1, pidToWord p2) of
99        EQUAL => String.compare(s1,s2)
100      | x => x
101  fun wjkey ({tag,pid,...} : 'a working_job) = (pid,tag)
102  fun wjk_member x [] = false
103    | wjk_member x (h::t) = jobkey_compare(x,h) = EQUAL orelse wjk_member x t
104
105  type 'a worklist = {
106    current_jobs : (jobkey, 'a working_job) Binarymap.dict,
107    current_state : 'a,
108    worklimit : int,
109    genjob : 'a -> 'a genjob_result
110  }
111
112  fun inStreamInPoll (strm : TextIO.instream) =
113    let
114      val (rd as TextPrimIO.RD{ioDesc,...}, buf) =
115          TextIO.StreamIO.getReader(TextIO.getInstream strm)
116      val _ =
117          TextIO.setInstream (strm, TextIO.StreamIO.mkInstream(rd,buf))
118    in
119      case ioDesc of
120          NONE => raise Fail "Can't poll instream"
121        | SOME d => OS.IO.pollIn (valOf (OS.IO.pollDesc d))
122                    handle Option => raise Fail "Can't poll instream"
123    end
124
125  fun workjob_polls (wj : 'a working_job) =
126    [(inStreamInPoll (#out wj), (OUT, wj)),
127     (inStreamInPoll (#err wj), (ERR, wj))]
128
129  fun worklist_polls (wl : 'a worklist) =
130    Binarymap.foldl (fn (_, wj, acc) => workjob_polls wj @ acc) []
131                    (#current_jobs wl)
132
133  fun new_worklist {worklimit : int,provider : 'a workprovider} : 'a worklist =
134    {current_jobs = Binarymap.mkDict jobkey_compare,
135     genjob = #genjob provider,
136     current_state = #initial provider,
137     worklimit = worklimit}
138
139  fun fupdjob k f (wl : 'a worklist) : 'a worklist =
140    let
141      val cj = #current_jobs wl
142      val cj' = Binarymap.insert(cj, k, f (Binarymap.peek(cj, k)))
143    in
144      updateWL wl (U #current_jobs cj') $$
145    end
146  fun cjs_addjob (wj : 'a working_job) d = Binarymap.insert(d, wjkey wj, wj)
147  fun addjob (wj:'a working_job) = fupdjob (wjkey wj) (fn _ => wj)
148
149  fun updstate s (wl : 'a worklist) : 'a worklist =
150    updateWL wl (U #current_state s) $$
151
152  fun start_job (j : 'a job) : 'a working_job =
153    let
154      open Posix.Process Posix.IO
155      val {tag, command, update} = j
156      val {executable,env,nm_args} = command
157      val {infd=outinfd, outfd = outoutfd} = pipe()
158      val {infd=errinfd, outfd = erroutfd} = pipe()
159      val {infd=ininfd,  outfd = inoutfd} = pipe()
160    in
161      case fork() of
162          NONE =>
163          let
164            val () = dup2 {old = outoutfd, new = Posix.FileSys.stdout}
165            val () = dup2 {old = erroutfd, new = Posix.FileSys.stderr}
166            val () = dup2 {old = ininfd, new = Posix.FileSys.stdin}
167            val () =
168                List.app close [errinfd, erroutfd, outinfd, outoutfd,
169                                ininfd, inoutfd]
170          in
171            exece(executable,nm_args,env)
172          end
173        | SOME pid =>
174          let
175            val out = mkTIO_instream outinfd
176            val err = mkTIO_instream errinfd
177            val () = List.app close [outoutfd, erroutfd, ininfd, inoutfd]
178          in
179            {
180              tag = tag,
181              command = command,
182              update = update,
183              out = out, outeof = false,
184              err = err, erreof = false,
185              pid = pid,
186              starttime = Time.now(),
187              lastevent = Time.now()
188            }
189          end
190    end
191
192  fun mk_shell_command {cline,extra_env} : command =
193    {executable = "/bin/sh", nm_args = ["/bin/sh", "-c", cline],
194     env = extra_env @ Posix.ProcEnv.environ()}
195  fun simple_shell s = mk_shell_command {cline = s, extra_env = []}
196  fun shellcommand s =
197    let
198      open Posix.Process
199      val j :int job = {tag = s, command = simple_shell s, update = K 0}
200      val wj = start_job j
201      fun read pfx acc strm k =
202        case TextIO.inputLine strm of
203            NONE => k acc
204          | SOME s => read pfx ((pfx^s)::acc) strm k
205    in
206      read "" [] (#out wj) (fn a => read "ERR: " a (#err wj) List.rev) before
207      ignore (waitpid (W_CHILD (#pid wj), []))
208    end
209
210  fun markeof0 chan (wj : 'a working_job) : 'a working_job =
211    case chan of
212        OUT => updateWJ wj (U #outeof true) $$
213      | ERR => updateWJ wj (U #erreof true) $$
214
215  fun markeof chan wj = wj |> markeof0 chan |> touch
216
217  fun chan_name OUT = "OUT"
218    | chan_name ERR = "ERR"
219
220  fun fill_workq monitorfn (acc as (cmds, wl : 'a worklist)) =
221    let
222      val {current_jobs,current_state,genjob,worklimit,...} = wl
223    in
224      if Binarymap.numItems current_jobs >= worklimit then acc
225      else
226        case genjob current_state of
227            NoMoreJobs s' => (cmds, updstate s' wl)
228          | NewJob (job, state') =>
229            let
230              val wj = start_job job
231              val cmds' = case monitorfn (StartJob (wjkey wj)) of
232                              NONE => cmds
233                            | SOME c => c::cmds
234            in
235              fill_workq monitorfn
236                         (cmds', wl |> addjob wj |> updstate state')
237            end
238          | GiveUpAndDie s' => (KillAll :: cmds, updstate s' wl)
239    end
240
241  fun text_monitor m =
242    let
243      open Posix.Process
244      fun p0 tag t msg killp =
245        (print (tag ^ "(" ^ Time.toString t ^ ")  " ^ msg ^ "\n");
246         killp)
247      fun p tag t msg = p0 tag t msg NONE
248    in
249      case m of
250          Output((pid,tag), t, chan, s) =>
251            p tag t ("["^chan_name chan^"]: " ^ s)
252        | NothingSeen ((pid,tag), {delay,total_elapsed}) =>
253            p tag total_elapsed ("delayed " ^ Time.toString delay)
254        | Terminated((pid,tag), st, t) =>
255          p0 tag t ("exited " ^ (if st = W_EXITED then "OK" else "FAILED"))
256             (if st = W_EXITED then NONE else SOME KillAll)
257        | MonitorKilled((pid,tag), t) => p tag t "monitor-killed"
258        | EOF ((pid,tag), chan, t) =>
259            p tag t ("EOF on " ^ chan_name chan)
260        | StartJob (pid,tag) => p tag (Time.fromSeconds 0) "beginning"
261    end
262
263  fun wjstrm ERR (wj:'a working_job) = #err wj
264    | wjstrm OUT wj = #out wj
265
266  fun killjob mfn (jk:jobkey) wl =
267    let
268      open Posix.Process
269      val cjs = #current_jobs wl
270      val job = Binarymap.find (cjs, jk)
271      val pid = #pid job
272      val state = #update job (#current_state wl, false)
273    in
274      kill (K_PROC pid, Posix.Signal.kill);
275      waitpid(W_CHILD pid, []);
276      ignore (mfn (MonitorKilled(jk,Time.-(Time.now(),#starttime job))));
277      updateWL wl
278               (U #current_state state)
279               (U #current_jobs (#1 (Binarymap.remove(cjs, jk)))) $$
280    end
281
282  fun killall mfn (wl : 'a worklist) =
283    Binarymap.foldl (fn (k,_,acc) => killjob mfn k acc)
284                    wl
285                    (#current_jobs wl)
286
287  fun execute_cmds mfn cmds wl =
288    case cmds of
289        [] => wl
290      | KillAll :: rest =>
291          wl |> killall mfn
292             |> (fn wl => updateWL wl (U #genjob NoMoreJobs) $$)
293             |> execute_cmds mfn rest
294      | Kill jk :: rest =>
295          wl |> killjob mfn jk |> execute_cmds mfn rest
296
297  fun elapsed wj = Time.-(Time.now(), #starttime wj)
298
299  fun do_work (wl0 : 'a worklist, monitorfn) =
300    let
301      open Posix.Process
302      val (cmds, wl1) = fill_workq monitorfn ([], wl0)
303      fun monitor msg (acc as (cmds, wl)) =
304        case monitorfn msg of
305            NONE => acc
306          | SOME c => (c::cmds, wl)
307      fun nothing wj (cmds, wl) =
308        let
309          val msg =
310              NothingSeen (wjkey wj, {delay = Time.-(Time.now(), #lastevent wj),
311                                      total_elapsed = elapsed wj})
312        in
313          monitor msg (cmds, addjob wj wl)
314        end
315      fun exitstatus wj status (cs, wl) =
316        let
317          val msg = Terminated (wjkey wj, status, elapsed wj)
318          val newstate = #update wj (#current_state wl, status = W_EXITED)
319        in
320          monitor msg (cs, updateWL wl (U #current_state newstate) $$)
321        end
322      fun eof wj chan (cmds, wl) =
323        monitor (EOF (wjkey wj, chan, elapsed wj))
324                (cmds, addjob (markeof chan wj) wl)
325      fun is_neweof wj chan =
326        case chan of
327            ERR => not (#erreof wj)
328          | OUT => not (#outeof wj)
329      fun dowait didio (k (* key *), wj, acc as (cmds,wl)) =
330        if wjk_member k didio then (cmds, addjob wj wl)
331        else
332          case waitpid_nh(W_CHILD (#pid wj), []) of
333              NONE => nothing wj acc
334            | SOME (_, status) => exitstatus wj status acc
335
336      fun workloop didio (cmds, wl) =
337        let
338          val empty_jobs = Binarymap.mkDict jobkey_compare
339        in
340          Binarymap.foldl (dowait didio)
341                          (cmds, updateWL wl (U #current_jobs empty_jobs) $$)
342                          (#current_jobs wl)
343        end
344
345      fun loop (cmds, wl : 'a worklist) : 'a =
346        if Binarymap.numItems (#current_jobs wl) = 0 then #current_state wl
347        else
348          let
349            val polls = worklist_polls wl
350            val active =
351                OS.IO.poll(map #1 polls, SOME (Time.fromMilliseconds 100))
352            fun foldthis (pi, (acc as (cmds, wl),didio)) =
353              let
354                val iod = OS.IO.pollToIODesc (OS.IO.infoToPollDesc pi)
355              in
356                case List.find (fn (pd,_) => iod = OS.IO.pollToIODesc pd) polls
357                 of
358                    NONE => raise Fail "Couldn't find poll-data in List.find"
359                  | SOME (_, (chan, wj)) =>
360                    let
361                      val s = TextIO.input (wjstrm chan wj)
362                      val didio' = wjkey wj :: didio
363                    in
364                      if size s = 0 then
365                        if is_neweof wj chan then (eof wj chan acc, didio')
366                        else (acc,didio)
367                      else
368                        let
369                          val msg = Output(wjkey wj, elapsed wj, chan, s)
370                        in
371                          (monitor msg (cmds, addjob (touch wj) wl), didio')
372                        end
373                    end
374              end
375            val ((cmds, wl), didio) =
376                List.foldl foldthis ((cmds,wl), []) active
377            val (cmds, wl) = workloop didio (cmds, wl)
378            val wl = execute_cmds monitorfn cmds wl
379          in
380            loop (fill_workq monitorfn ([], wl))
381          end
382    in
383      loop (cmds, wl1)
384    end
385
386  fun fupdAlist k f [] = raise Fail "updAlist: No element with given key"
387    | fupdAlist k f ((k',v') :: rest) =
388      if k=k' then (k,f v') :: rest
389      else (k',v') :: fupdAlist k f rest
390  fun findUpd P f k [] = k (NONE, [])
391    | findUpd P f k (x::xs) =
392      if P x then k (SOME (f x), f x :: xs)
393      else findUpd P f (fn (res, l) => k (res, x::l)) xs
394
395
396  fun shell_commands m (cmds0, n) =
397    let
398      datatype stat = Waiting | Running | Done of bool
399      val (cmds00, _) =
400          List.foldl
401            (fn (c, (cs, n)) => ((str (chr n), (c, Waiting))::cs, n + 1))
402            ([], 65)
403            cmds0
404      val cmds = List.rev cmds00
405      fun genjob clist =
406        let
407          val (cdata, l) = findUpd (fn (_, (_, s)) => s = Waiting)
408                                   (fn (k, (c, _)) => (k, (c, Running)))
409                                   (fn x => x)
410                                   clist
411        in
412          case cdata of
413              NONE => NoMoreJobs clist
414            | SOME (t, (c, _)) =>
415              let
416                fun upd(clist, b) = fupdAlist t (fn (c,_) => (c,Done b)) clist
417              in
418                NewJob ({tag = t, command = simple_shell c, update = upd}, l)
419              end
420        end
421      val wl =
422          new_worklist {
423            provider = {initial = cmds, genjob = genjob},
424            worklimit = n
425          }
426      val cs = do_work(wl,m)
427    in
428      List.mapPartial (fn (k,(c,st)) =>
429                          case st of
430                              Done b => SOME (c,b)
431                            | _ => NONE)
432                      cs
433    end
434
435
436end
437