1structure ProcessMultiplexor : ProcessMultiplexor = 2struct 3 4 infix |> 5 fun x |> f = f x 6 fun K x y = x 7 8 fun assoc1 k [] = NONE 9 | assoc1 k ((p as (k',v))::rest) = if k = k' then SOME p else assoc1 k rest 10 11 type pid = Posix.ProcEnv.pid 12 val pidToWord = Posix.Process.pidToWord 13 type exit_status = Posix.Process.exit_status 14 15 type command = {executable: string, nm_args : string list, env : string list} 16 type 'a job = {tag : string, command : command, update : 'a * bool -> 'a} 17 datatype 'a genjob_result = 18 NoMoreJobs of 'a | NewJob of ('a job * 'a) | GiveUpAndDie of 'a 19 type 'a workprovider = { initial : 'a, genjob : 'a -> 'a genjob_result } 20 21 type 'a working_job = { 22 tag : string, 23 command : command, 24 update : 'a * bool -> 'a, 25 starttime : Time.time, 26 lastevent : Time.time, 27 out : TextIO.instream, 28 err : TextIO.instream, 29 outeof : bool, 30 erreof : bool, 31 pid : pid 32 } 33 type jobkey = pid * string 34 datatype strmtype = OUT | ERR 35 datatype monitor_message = 36 Output of jobkey * Time.time * strmtype * string 37 | NothingSeen of jobkey * {delay: Time.time, total_elapsed : Time.time} 38 | Terminated of jobkey * exit_status * Time.time 39 | MonitorKilled of jobkey * Time.time 40 | EOF of jobkey * strmtype * Time.time 41 | StartJob of jobkey 42 datatype client_cmd = Kill of jobkey | KillAll 43 type monitor = monitor_message -> client_cmd option 44 45 local 46 open FunctionalRecordUpdate 47 fun makeUpdateWJ z = makeUpdate10 z (* 10 fields *) 48 fun makeUpdateWL z = makeUpdate4 z (* 4 fields *) 49 in 50 fun updateWJ z = let 51 fun from tag command update starttime lastevent out err 52 outeof erreof pid = 53 {tag = tag, command = command, update = update, starttime = starttime, 54 lastevent = lastevent, out = out, err = err, outeof = outeof, 55 erreof = erreof, pid = pid} 56 fun from' pid erreof outeof err out lastevent starttime update command 57 tag = 58 {tag = tag, command = command, update = update, starttime = starttime, 59 lastevent = lastevent, out = out, err = err, outeof = outeof, 60 erreof = erreof, pid = pid} 61 fun to f {tag, command, update, starttime, lastevent, out, 62 err, outeof, erreof, pid} = 63 f tag command update starttime lastevent out err 64 outeof erreof pid 65 in 66 makeUpdateWJ (from, from', to) 67 end z 68 fun updateWL z = let 69 fun from current_jobs current_state worklimit genjob = 70 {current_state = current_state, current_jobs = current_jobs, 71 worklimit = worklimit, genjob = genjob} 72 fun from' genjob worklimit current_state current_jobs = 73 {current_state = current_state, current_jobs = current_jobs, 74 worklimit = worklimit, genjob = genjob} 75 fun to f {current_state, current_jobs, worklimit, genjob} = 76 f current_jobs current_state worklimit genjob 77 in 78 makeUpdateWL (from, from', to) 79 end z 80 81 val U = U 82 val $$ = $$ 83 end 84 85 fun touch (wj : 'a working_job) : 'a working_job = 86 updateWJ wj (U #lastevent (Time.now())) $$ 87 88 fun mkTIO_instream fd = 89 let 90 open Posix.IO 91 val (flags,_) = getfl fd 92 val rdr = mkTextReader { fd = fd, name = "", initBlkMode = not(O.allSet(O.nonblock,flags)) } 93 in 94 TextIO.mkInstream (TextIO.StreamIO.mkInstream (rdr, "")) 95 end 96 97 fun jobkey_compare((p1,s1), (p2,s2)) = 98 case SysWord.compare(pidToWord p1, pidToWord p2) of 99 EQUAL => String.compare(s1,s2) 100 | x => x 101 fun wjkey ({tag,pid,...} : 'a working_job) = (pid,tag) 102 fun wjk_member x [] = false 103 | wjk_member x (h::t) = jobkey_compare(x,h) = EQUAL orelse wjk_member x t 104 105 type 'a worklist = { 106 current_jobs : (jobkey, 'a working_job) Binarymap.dict, 107 current_state : 'a, 108 worklimit : int, 109 genjob : 'a -> 'a genjob_result 110 } 111 112 fun inStreamInPoll (strm : TextIO.instream) = 113 let 114 val (rd as TextPrimIO.RD{ioDesc,...}, buf) = 115 TextIO.StreamIO.getReader(TextIO.getInstream strm) 116 val _ = 117 TextIO.setInstream (strm, TextIO.StreamIO.mkInstream(rd,buf)) 118 in 119 case ioDesc of 120 NONE => raise Fail "Can't poll instream" 121 | SOME d => OS.IO.pollIn (valOf (OS.IO.pollDesc d)) 122 handle Option => raise Fail "Can't poll instream" 123 end 124 125 fun workjob_polls (wj : 'a working_job) = 126 [(inStreamInPoll (#out wj), (OUT, wj)), 127 (inStreamInPoll (#err wj), (ERR, wj))] 128 129 fun worklist_polls (wl : 'a worklist) = 130 Binarymap.foldl (fn (_, wj, acc) => workjob_polls wj @ acc) [] 131 (#current_jobs wl) 132 133 fun new_worklist {worklimit : int,provider : 'a workprovider} : 'a worklist = 134 {current_jobs = Binarymap.mkDict jobkey_compare, 135 genjob = #genjob provider, 136 current_state = #initial provider, 137 worklimit = worklimit} 138 139 fun fupdjob k f (wl : 'a worklist) : 'a worklist = 140 let 141 val cj = #current_jobs wl 142 val cj' = Binarymap.insert(cj, k, f (Binarymap.peek(cj, k))) 143 in 144 updateWL wl (U #current_jobs cj') $$ 145 end 146 fun cjs_addjob (wj : 'a working_job) d = Binarymap.insert(d, wjkey wj, wj) 147 fun addjob (wj:'a working_job) = fupdjob (wjkey wj) (fn _ => wj) 148 149 fun updstate s (wl : 'a worklist) : 'a worklist = 150 updateWL wl (U #current_state s) $$ 151 152 fun start_job (j : 'a job) : 'a working_job = 153 let 154 open Posix.Process Posix.IO 155 val {tag, command, update} = j 156 val {executable,env,nm_args} = command 157 val {infd=outinfd, outfd = outoutfd} = pipe() 158 val {infd=errinfd, outfd = erroutfd} = pipe() 159 val {infd=ininfd, outfd = inoutfd} = pipe() 160 in 161 case fork() of 162 NONE => 163 let 164 val () = dup2 {old = outoutfd, new = Posix.FileSys.stdout} 165 val () = dup2 {old = erroutfd, new = Posix.FileSys.stderr} 166 val () = dup2 {old = ininfd, new = Posix.FileSys.stdin} 167 val () = 168 List.app close [errinfd, erroutfd, outinfd, outoutfd, 169 ininfd, inoutfd] 170 in 171 exece(executable,nm_args,env) 172 end 173 | SOME pid => 174 let 175 val out = mkTIO_instream outinfd 176 val err = mkTIO_instream errinfd 177 val () = List.app close [outoutfd, erroutfd, ininfd, inoutfd] 178 in 179 { 180 tag = tag, 181 command = command, 182 update = update, 183 out = out, outeof = false, 184 err = err, erreof = false, 185 pid = pid, 186 starttime = Time.now(), 187 lastevent = Time.now() 188 } 189 end 190 end 191 192 fun mk_shell_command {cline,extra_env} : command = 193 {executable = "/bin/sh", nm_args = ["/bin/sh", "-c", cline], 194 env = extra_env @ Posix.ProcEnv.environ()} 195 fun simple_shell s = mk_shell_command {cline = s, extra_env = []} 196 fun shellcommand s = 197 let 198 open Posix.Process 199 val j :int job = {tag = s, command = simple_shell s, update = K 0} 200 val wj = start_job j 201 fun read pfx acc strm k = 202 case TextIO.inputLine strm of 203 NONE => k acc 204 | SOME s => read pfx ((pfx^s)::acc) strm k 205 in 206 read "" [] (#out wj) (fn a => read "ERR: " a (#err wj) List.rev) before 207 ignore (waitpid (W_CHILD (#pid wj), [])) 208 end 209 210 fun markeof0 chan (wj : 'a working_job) : 'a working_job = 211 case chan of 212 OUT => updateWJ wj (U #outeof true) $$ 213 | ERR => updateWJ wj (U #erreof true) $$ 214 215 fun markeof chan wj = wj |> markeof0 chan |> touch 216 217 fun chan_name OUT = "OUT" 218 | chan_name ERR = "ERR" 219 220 fun fill_workq monitorfn (acc as (cmds, wl : 'a worklist)) = 221 let 222 val {current_jobs,current_state,genjob,worklimit,...} = wl 223 in 224 if Binarymap.numItems current_jobs >= worklimit then acc 225 else 226 case genjob current_state of 227 NoMoreJobs s' => (cmds, updstate s' wl) 228 | NewJob (job, state') => 229 let 230 val wj = start_job job 231 val cmds' = case monitorfn (StartJob (wjkey wj)) of 232 NONE => cmds 233 | SOME c => c::cmds 234 in 235 fill_workq monitorfn 236 (cmds', wl |> addjob wj |> updstate state') 237 end 238 | GiveUpAndDie s' => (KillAll :: cmds, updstate s' wl) 239 end 240 241 fun text_monitor m = 242 let 243 open Posix.Process 244 fun p0 tag t msg killp = 245 (print (tag ^ "(" ^ Time.toString t ^ ") " ^ msg ^ "\n"); 246 killp) 247 fun p tag t msg = p0 tag t msg NONE 248 in 249 case m of 250 Output((pid,tag), t, chan, s) => 251 p tag t ("["^chan_name chan^"]: " ^ s) 252 | NothingSeen ((pid,tag), {delay,total_elapsed}) => 253 p tag total_elapsed ("delayed " ^ Time.toString delay) 254 | Terminated((pid,tag), st, t) => 255 p0 tag t ("exited " ^ (if st = W_EXITED then "OK" else "FAILED")) 256 (if st = W_EXITED then NONE else SOME KillAll) 257 | MonitorKilled((pid,tag), t) => p tag t "monitor-killed" 258 | EOF ((pid,tag), chan, t) => 259 p tag t ("EOF on " ^ chan_name chan) 260 | StartJob (pid,tag) => p tag (Time.fromSeconds 0) "beginning" 261 end 262 263 fun wjstrm ERR (wj:'a working_job) = #err wj 264 | wjstrm OUT wj = #out wj 265 266 fun killjob mfn (jk:jobkey) wl = 267 let 268 open Posix.Process 269 val cjs = #current_jobs wl 270 val job = Binarymap.find (cjs, jk) 271 val pid = #pid job 272 val state = #update job (#current_state wl, false) 273 in 274 kill (K_PROC pid, Posix.Signal.kill); 275 waitpid(W_CHILD pid, []); 276 ignore (mfn (MonitorKilled(jk,Time.-(Time.now(),#starttime job)))); 277 updateWL wl 278 (U #current_state state) 279 (U #current_jobs (#1 (Binarymap.remove(cjs, jk)))) $$ 280 end 281 282 fun killall mfn (wl : 'a worklist) = 283 Binarymap.foldl (fn (k,_,acc) => killjob mfn k acc) 284 wl 285 (#current_jobs wl) 286 287 fun execute_cmds mfn cmds wl = 288 case cmds of 289 [] => wl 290 | KillAll :: rest => 291 wl |> killall mfn 292 |> (fn wl => updateWL wl (U #genjob NoMoreJobs) $$) 293 |> execute_cmds mfn rest 294 | Kill jk :: rest => 295 wl |> killjob mfn jk |> execute_cmds mfn rest 296 297 fun elapsed wj = Time.-(Time.now(), #starttime wj) 298 299 fun do_work (wl0 : 'a worklist, monitorfn) = 300 let 301 open Posix.Process 302 val (cmds, wl1) = fill_workq monitorfn ([], wl0) 303 fun monitor msg (acc as (cmds, wl)) = 304 case monitorfn msg of 305 NONE => acc 306 | SOME c => (c::cmds, wl) 307 fun nothing wj (cmds, wl) = 308 let 309 val msg = 310 NothingSeen (wjkey wj, {delay = Time.-(Time.now(), #lastevent wj), 311 total_elapsed = elapsed wj}) 312 in 313 monitor msg (cmds, addjob wj wl) 314 end 315 fun exitstatus wj status (cs, wl) = 316 let 317 val msg = Terminated (wjkey wj, status, elapsed wj) 318 val newstate = #update wj (#current_state wl, status = W_EXITED) 319 in 320 monitor msg (cs, updateWL wl (U #current_state newstate) $$) 321 end 322 fun eof wj chan (cmds, wl) = 323 monitor (EOF (wjkey wj, chan, elapsed wj)) 324 (cmds, addjob (markeof chan wj) wl) 325 fun is_neweof wj chan = 326 case chan of 327 ERR => not (#erreof wj) 328 | OUT => not (#outeof wj) 329 fun dowait didio (k (* key *), wj, acc as (cmds,wl)) = 330 if wjk_member k didio then (cmds, addjob wj wl) 331 else 332 case waitpid_nh(W_CHILD (#pid wj), []) of 333 NONE => nothing wj acc 334 | SOME (_, status) => exitstatus wj status acc 335 336 fun workloop didio (cmds, wl) = 337 let 338 val empty_jobs = Binarymap.mkDict jobkey_compare 339 in 340 Binarymap.foldl (dowait didio) 341 (cmds, updateWL wl (U #current_jobs empty_jobs) $$) 342 (#current_jobs wl) 343 end 344 345 fun loop (cmds, wl : 'a worklist) : 'a = 346 if Binarymap.numItems (#current_jobs wl) = 0 then #current_state wl 347 else 348 let 349 val polls = worklist_polls wl 350 val active = 351 OS.IO.poll(map #1 polls, SOME (Time.fromMilliseconds 100)) 352 fun foldthis (pi, (acc as (cmds, wl),didio)) = 353 let 354 val iod = OS.IO.pollToIODesc (OS.IO.infoToPollDesc pi) 355 in 356 case List.find (fn (pd,_) => iod = OS.IO.pollToIODesc pd) polls 357 of 358 NONE => raise Fail "Couldn't find poll-data in List.find" 359 | SOME (_, (chan, wj)) => 360 let 361 val s = TextIO.input (wjstrm chan wj) 362 val didio' = wjkey wj :: didio 363 in 364 if size s = 0 then 365 if is_neweof wj chan then (eof wj chan acc, didio') 366 else (acc,didio) 367 else 368 let 369 val msg = Output(wjkey wj, elapsed wj, chan, s) 370 in 371 (monitor msg (cmds, addjob (touch wj) wl), didio') 372 end 373 end 374 end 375 val ((cmds, wl), didio) = 376 List.foldl foldthis ((cmds,wl), []) active 377 val (cmds, wl) = workloop didio (cmds, wl) 378 val wl = execute_cmds monitorfn cmds wl 379 in 380 loop (fill_workq monitorfn ([], wl)) 381 end 382 in 383 loop (cmds, wl1) 384 end 385 386 fun fupdAlist k f [] = raise Fail "updAlist: No element with given key" 387 | fupdAlist k f ((k',v') :: rest) = 388 if k=k' then (k,f v') :: rest 389 else (k',v') :: fupdAlist k f rest 390 fun findUpd P f k [] = k (NONE, []) 391 | findUpd P f k (x::xs) = 392 if P x then k (SOME (f x), f x :: xs) 393 else findUpd P f (fn (res, l) => k (res, x::l)) xs 394 395 396 fun shell_commands m (cmds0, n) = 397 let 398 datatype stat = Waiting | Running | Done of bool 399 val (cmds00, _) = 400 List.foldl 401 (fn (c, (cs, n)) => ((str (chr n), (c, Waiting))::cs, n + 1)) 402 ([], 65) 403 cmds0 404 val cmds = List.rev cmds00 405 fun genjob clist = 406 let 407 val (cdata, l) = findUpd (fn (_, (_, s)) => s = Waiting) 408 (fn (k, (c, _)) => (k, (c, Running))) 409 (fn x => x) 410 clist 411 in 412 case cdata of 413 NONE => NoMoreJobs clist 414 | SOME (t, (c, _)) => 415 let 416 fun upd(clist, b) = fupdAlist t (fn (c,_) => (c,Done b)) clist 417 in 418 NewJob ({tag = t, command = simple_shell c, update = upd}, l) 419 end 420 end 421 val wl = 422 new_worklist { 423 provider = {initial = cmds, genjob = genjob}, 424 worklimit = n 425 } 426 val cs = do_work(wl,m) 427 in 428 List.mapPartial (fn (k,(c,st)) => 429 case st of 430 Done b => SOME (c,b) 431 | _ => NONE) 432 cs 433 end 434 435 436end 437