1/* Title: Pure/PIDE/prover.scala 2 Author: Makarius 3 Options: :folding=explicit: 4 5Prover process wrapping. 6*/ 7 8package isabelle 9 10 11import java.io.{InputStream, OutputStream, BufferedOutputStream, IOException} 12 13 14object Prover 15{ 16 /* messages */ 17 18 sealed abstract class Message 19 type Receiver = Message => Unit 20 21 class Input(val name: String, val args: List[String]) extends Message 22 { 23 override def toString: String = 24 XML.Elem(Markup(Markup.PROVER_COMMAND, List((Markup.NAME, name))), 25 args.map(s => 26 List(XML.Text("\n"), XML.elem(Markup.PROVER_ARG, YXML.parse_body(s)))).flatten).toString 27 } 28 29 class Output(val message: XML.Elem) extends Message 30 { 31 def kind: String = message.markup.name 32 def properties: Properties.T = message.markup.properties 33 def body: XML.Body = message.body 34 35 def is_init = kind == Markup.INIT 36 def is_exit = kind == Markup.EXIT 37 def is_stdout = kind == Markup.STDOUT 38 def is_stderr = kind == Markup.STDERR 39 def is_system = kind == Markup.SYSTEM 40 def is_status = kind == Markup.STATUS 41 def is_report = kind == Markup.REPORT 42 def is_syslog = is_init || is_exit || is_system || is_stderr 43 44 override def toString: String = 45 { 46 val res = 47 if (is_status || is_report) message.body.map(_.toString).mkString 48 else Pretty.string_of(message.body) 49 if (properties.isEmpty) 50 kind.toString + " [[" + res + "]]" 51 else 52 kind.toString + " " + 53 (for ((x, y) <- properties) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]" 54 } 55 } 56 57 class Protocol_Output(props: Properties.T, val bytes: Bytes) 58 extends Output(XML.Elem(Markup(Markup.PROTOCOL, props), Nil)) 59 { 60 lazy val text: String = bytes.text 61 } 62} 63 64 65class Prover( 66 receiver: Prover.Receiver, 67 xml_cache: XML.Cache, 68 channel: System_Channel, 69 process: Bash.Process) extends Protocol 70{ 71 /** receiver output **/ 72 73 private def system_output(text: String) 74 { 75 receiver(new Prover.Output(XML.Elem(Markup(Markup.SYSTEM, Nil), List(XML.Text(text))))) 76 } 77 78 private def protocol_output(props: Properties.T, bytes: Bytes) 79 { 80 receiver(new Prover.Protocol_Output(props, bytes)) 81 } 82 83 private def output(kind: String, props: Properties.T, body: XML.Body) 84 { 85 if (kind == Markup.INIT) channel.accepted() 86 87 val main = XML.Elem(Markup(kind, props), Protocol_Message.clean_reports(body)) 88 val reports = Protocol_Message.reports(props, body) 89 for (msg <- main :: reports) receiver(new Prover.Output(xml_cache.elem(msg))) 90 } 91 92 private def exit_message(result: Process_Result) 93 { 94 output(Markup.EXIT, Markup.Process_Result(result), 95 List(XML.Text("Return code: " + result.rc.toString))) 96 } 97 98 99 100 /** process manager **/ 101 102 private val process_result: Future[Process_Result] = 103 Future.thread("process_result") { 104 val rc = process.join 105 val timing = process.get_timing 106 Process_Result(rc, timing = timing) 107 } 108 109 private def terminate_process() 110 { 111 try { process.terminate } 112 catch { 113 case exn @ ERROR(_) => system_output("Failed to terminate prover process: " + exn.getMessage) 114 } 115 } 116 117 private val process_manager = Standard_Thread.fork("process_manager") 118 { 119 val (startup_failed, startup_errors) = 120 { 121 var finished: Option[Boolean] = None 122 val result = new StringBuilder(100) 123 while (finished.isEmpty && (process.stderr.ready || !process_result.is_finished)) { 124 while (finished.isEmpty && process.stderr.ready) { 125 try { 126 val c = process.stderr.read 127 if (c == 2) finished = Some(true) 128 else result += c.toChar 129 } 130 catch { case _: IOException => finished = Some(false) } 131 } 132 Thread.sleep(10) 133 } 134 (finished.isEmpty || !finished.get, result.toString.trim) 135 } 136 if (startup_errors != "") system_output(startup_errors) 137 138 if (startup_failed) { 139 terminate_process() 140 process_result.join 141 exit_message(Process_Result(127)) 142 } 143 else { 144 val (command_stream, message_stream) = channel.rendezvous() 145 146 command_input_init(command_stream) 147 val stdout = physical_output(false) 148 val stderr = physical_output(true) 149 val message = message_output(message_stream) 150 151 val result = process_result.join 152 system_output("process terminated") 153 command_input_close() 154 for (thread <- List(stdout, stderr, message)) thread.join 155 system_output("process_manager terminated") 156 exit_message(result) 157 } 158 channel.accepted() 159 } 160 161 162 /* management methods */ 163 164 def join() { process_manager.join() } 165 166 def terminate() 167 { 168 system_output("Terminating prover process") 169 command_input_close() 170 171 var count = 10 172 while (!process_result.is_finished && count > 0) { 173 Thread.sleep(100) 174 count -= 1 175 } 176 if (!process_result.is_finished) terminate_process() 177 } 178 179 180 181 /** process streams **/ 182 183 /* command input */ 184 185 private var command_input: Option[Consumer_Thread[List[Bytes]]] = None 186 187 private def command_input_close(): Unit = command_input.foreach(_.shutdown) 188 189 private def command_input_init(raw_stream: OutputStream) 190 { 191 val name = "command_input" 192 val stream = new BufferedOutputStream(raw_stream) 193 command_input = 194 Some( 195 Consumer_Thread.fork(name)( 196 consume = 197 { 198 case chunks => 199 try { 200 Bytes(chunks.map(_.length).mkString("", ",", "\n")).write_stream(stream) 201 chunks.foreach(_.write_stream(stream)) 202 stream.flush 203 true 204 } 205 catch { case e: IOException => system_output(name + ": " + e.getMessage); false } 206 }, 207 finish = { case () => stream.close; system_output(name + " terminated") } 208 ) 209 ) 210 } 211 212 213 /* physical output */ 214 215 private def physical_output(err: Boolean): Thread = 216 { 217 val (name, reader, markup) = 218 if (err) ("standard_error", process.stderr, Markup.STDERR) 219 else ("standard_output", process.stdout, Markup.STDOUT) 220 221 Standard_Thread.fork(name) { 222 try { 223 var result = new StringBuilder(100) 224 var finished = false 225 while (!finished) { 226 //{{{ 227 var c = -1 228 var done = false 229 while (!done && (result.length == 0 || reader.ready)) { 230 c = reader.read 231 if (c >= 0) result.append(c.asInstanceOf[Char]) 232 else done = true 233 } 234 if (result.length > 0) { 235 output(markup, Nil, List(XML.Text(Symbol.decode(result.toString)))) 236 result.length = 0 237 } 238 else { 239 reader.close 240 finished = true 241 } 242 //}}} 243 } 244 } 245 catch { case e: IOException => system_output(name + ": " + e.getMessage) } 246 system_output(name + " terminated") 247 } 248 } 249 250 251 /* message output */ 252 253 private def message_output(stream: InputStream): Thread = 254 { 255 class EOF extends Exception 256 class Protocol_Error(msg: String) extends Exception(msg) 257 258 val name = "message_output" 259 Standard_Thread.fork(name) { 260 val default_buffer = new Array[Byte](65536) 261 var c = -1 262 263 def read_int(): Int = 264 //{{{ 265 { 266 var n = 0 267 c = stream.read 268 if (c == -1) throw new EOF 269 while (48 <= c && c <= 57) { 270 n = 10 * n + (c - 48) 271 c = stream.read 272 } 273 if (c != 10) 274 throw new Protocol_Error("malformed header: expected integer followed by newline") 275 else n 276 } 277 //}}} 278 279 def read_chunk_bytes(): (Array[Byte], Int) = 280 //{{{ 281 { 282 val n = read_int() 283 val buf = 284 if (n <= default_buffer.length) default_buffer 285 else new Array[Byte](n) 286 287 var i = 0 288 var m = 0 289 do { 290 m = stream.read(buf, i, n - i) 291 if (m != -1) i += m 292 } 293 while (m != -1 && n > i) 294 295 if (i != n) 296 throw new Protocol_Error("bad chunk (unexpected EOF after " + i + " of " + n + " bytes)") 297 298 (buf, n) 299 } 300 //}}} 301 302 def read_chunk(): XML.Body = 303 { 304 val (buf, n) = read_chunk_bytes() 305 YXML.parse_body_failsafe(UTF8.decode_chars(Symbol.decode, buf, 0, n)) 306 } 307 308 try { 309 do { 310 try { 311 val header = read_chunk() 312 header match { 313 case List(XML.Elem(Markup(name, props), Nil)) => 314 val kind = name.intern 315 if (kind == Markup.PROTOCOL) { 316 val (buf, n) = read_chunk_bytes() 317 protocol_output(props, Bytes(buf, 0, n)) 318 } 319 else { 320 val body = read_chunk() 321 output(kind, props, body) 322 } 323 case _ => 324 read_chunk() 325 throw new Protocol_Error("bad header: " + header.toString) 326 } 327 } 328 catch { case _: EOF => } 329 } 330 while (c != -1) 331 } 332 catch { 333 case e: IOException => system_output("Cannot read message:\n" + e.getMessage) 334 case e: Protocol_Error => system_output("Malformed message:\n" + e.getMessage) 335 } 336 stream.close 337 338 system_output(name + " terminated") 339 } 340 } 341 342 343 344 /** protocol commands **/ 345 346 def protocol_command_bytes(name: String, args: Bytes*): Unit = 347 command_input match { 348 case Some(thread) => thread.send(Bytes(name) :: args.toList) 349 case None => error("Uninitialized command input thread") 350 } 351 352 def protocol_command(name: String, args: String*) 353 { 354 receiver(new Prover.Input(name, args.toList)) 355 protocol_command_bytes(name, args.map(Bytes(_)): _*) 356 } 357} 358