1/* Title: Pure/PIDE/session.scala 2 Author: Makarius 3 Options: :folding=explicit: 4 5PIDE editor session, potentially with running prover process. 6*/ 7 8package isabelle 9 10 11import scala.collection.immutable.Queue 12 13 14object Session 15{ 16 /* outlets */ 17 18 object Consumer 19 { 20 def apply[A](name: String)(consume: A => Unit): Consumer[A] = 21 new Consumer[A](name, consume) 22 } 23 final class Consumer[-A] private(val name: String, val consume: A => Unit) 24 25 class Outlet[A](dispatcher: Consumer_Thread[() => Unit]) 26 { 27 private val consumers = Synchronized[List[Consumer[A]]](Nil) 28 29 def += (c: Consumer[A]) { consumers.change(Library.update(c)) } 30 def -= (c: Consumer[A]) { consumers.change(Library.remove(c)) } 31 32 def post(a: A) 33 { 34 for (c <- consumers.value.iterator) { 35 dispatcher.send(() => 36 try { c.consume(a) } 37 catch { 38 case exn: Throwable => 39 Output.error_message("Consumer failed: " + quote(c.name) + "\n" + Exn.message(exn)) 40 }) 41 } 42 } 43 } 44 45 46 /* change */ 47 48 sealed case class Change( 49 previous: Document.Version, 50 syntax_changed: List[Document.Node.Name], 51 deps_changed: Boolean, 52 doc_edits: List[Document.Edit_Command], 53 consolidate: List[Document.Node.Name], 54 version: Document.Version) 55 56 case object Change_Flush 57 58 59 /* events */ 60 61 //{{{ 62 case class Statistics(props: Properties.T) 63 case class Global_Options(options: Options) 64 case object Caret_Focus 65 case class Raw_Edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text]) 66 case class Dialog_Result(id: Document_ID.Generic, serial: Long, result: String) 67 case class Build_Theories(id: String, master_dir: Path, theories: List[(Options, List[Path])]) 68 case class Commands_Changed( 69 assignment: Boolean, nodes: Set[Document.Node.Name], commands: Set[Command]) 70 71 sealed abstract class Phase 72 { 73 def print: String = 74 this match { 75 case Terminated(result) => if (result.ok) "finished" else "failed" 76 case _ => Word.lowercase(this.toString) 77 } 78 } 79 case object Inactive extends Phase // stable 80 case object Startup extends Phase // transient 81 case object Ready extends Phase // metastable 82 case object Shutdown extends Phase // transient 83 case class Terminated(result: Process_Result) extends Phase // stable 84 //}}} 85 86 87 /* syslog */ 88 89 private[Session] class Syslog(limit: Int) 90 { 91 private var queue = Queue.empty[XML.Elem] 92 private var length = 0 93 94 def += (msg: XML.Elem): Unit = synchronized { 95 queue = queue.enqueue(msg) 96 length += 1 97 if (length > limit) queue = queue.dequeue._2 98 } 99 100 def content: String = synchronized { 101 cat_lines(queue.iterator.map(XML.content)) + 102 (if (length > limit) "\n(A total of " + length + " messages...)" else "") 103 } 104 } 105 106 107 /* protocol handlers */ 108 109 abstract class Protocol_Handler 110 { 111 def init(session: Session): Unit = {} 112 def exit(): Unit = {} 113 val functions: List[(String, Prover.Protocol_Output => Boolean)] 114 } 115} 116 117 118class Session(session_options: => Options, val resources: Resources) extends Document.Session 119{ 120 session => 121 122 val xml_cache: XML.Cache = XML.make_cache() 123 val xz_cache: XZ.Cache = XZ.make_cache() 124 125 126 /* global flags */ 127 128 @volatile var timing: Boolean = false 129 @volatile var verbose: Boolean = false 130 131 132 /* dynamic session options */ 133 134 def output_delay: Time = session_options.seconds("editor_output_delay") 135 def consolidate_delay: Time = session_options.seconds("editor_consolidate_delay") 136 def prune_delay: Time = session_options.seconds("editor_prune_delay") 137 def prune_size: Int = session_options.int("editor_prune_size") 138 def syslog_limit: Int = session_options.int("editor_syslog_limit") 139 def reparse_limit: Int = session_options.int("editor_reparse_limit") 140 141 142 /* dispatcher */ 143 144 private val dispatcher = 145 Consumer_Thread.fork[() => Unit]("Session.dispatcher", daemon = true) { case e => e(); true } 146 147 def assert_dispatcher[A](body: => A): A = 148 { 149 assert(dispatcher.check_thread) 150 body 151 } 152 153 def require_dispatcher[A](body: => A): A = 154 { 155 require(dispatcher.check_thread) 156 body 157 } 158 159 def send_dispatcher(body: => Unit): Unit = 160 { 161 if (dispatcher.check_thread) body 162 else dispatcher.send(() => body) 163 } 164 165 def send_wait_dispatcher(body: => Unit): Unit = 166 { 167 if (dispatcher.check_thread) body 168 else dispatcher.send_wait(() => body) 169 } 170 171 172 /* outlets */ 173 174 val statistics = new Session.Outlet[Session.Statistics](dispatcher) 175 val global_options = new Session.Outlet[Session.Global_Options](dispatcher) 176 val caret_focus = new Session.Outlet[Session.Caret_Focus.type](dispatcher) 177 val raw_edits = new Session.Outlet[Session.Raw_Edits](dispatcher) 178 val commands_changed = new Session.Outlet[Session.Commands_Changed](dispatcher) 179 val phase_changed = new Session.Outlet[Session.Phase](dispatcher) 180 val syslog_messages = new Session.Outlet[Prover.Output](dispatcher) 181 val raw_output_messages = new Session.Outlet[Prover.Output](dispatcher) 182 val trace_events = new Session.Outlet[Simplifier_Trace.Event.type](dispatcher) 183 val debugger_updates = new Session.Outlet[Debugger.Update.type](dispatcher) 184 185 val all_messages = new Session.Outlet[Prover.Message](dispatcher) // potential bottle-neck! 186 187 188 /** main protocol manager **/ 189 190 /* internal messages */ 191 192 private case class Start(start_prover: Prover.Receiver => Prover) 193 private case object Stop 194 private case class Cancel_Exec(exec_id: Document_ID.Exec) 195 private case class Protocol_Command(name: String, args: List[String]) 196 private case class Update_Options(options: Options) 197 private case object Consolidate_Execution 198 private case object Prune_History 199 200 201 /* phase */ 202 203 private def post_phase(new_phase: Session.Phase): Session.Phase = 204 { 205 phase_changed.post(new_phase) 206 new_phase 207 } 208 private val _phase = Synchronized[Session.Phase](Session.Inactive) 209 private def phase_=(new_phase: Session.Phase): Unit = _phase.change(_ => post_phase(new_phase)) 210 211 def phase = _phase.value 212 def is_ready: Boolean = phase == Session.Ready 213 214 215 /* global state */ 216 217 private val syslog = new Session.Syslog(syslog_limit) 218 def syslog_content(): String = syslog.content 219 220 private val global_state = Synchronized(Document.State.init) 221 def current_state(): Document.State = global_state.value 222 223 def recent_syntax(name: Document.Node.Name): Outer_Syntax = 224 global_state.value.recent_finished.version.get_finished.nodes(name).syntax getOrElse 225 resources.session_base.overall_syntax 226 227 228 /* pipelined change parsing */ 229 230 private case class Text_Edits( 231 previous: Future[Document.Version], 232 doc_blobs: Document.Blobs, 233 text_edits: List[Document.Edit_Text], 234 consolidate: List[Document.Node.Name], 235 version_result: Promise[Document.Version]) 236 237 private val change_parser = Consumer_Thread.fork[Text_Edits]("change_parser", daemon = true) 238 { 239 case Text_Edits(previous, doc_blobs, text_edits, consolidate, version_result) => 240 val prev = previous.get_finished 241 val change = 242 Timing.timeit("parse_change", timing) { 243 resources.parse_change(reparse_limit, prev, doc_blobs, text_edits, consolidate) 244 } 245 version_result.fulfill(change.version) 246 manager.send(change) 247 true 248 } 249 250 251 /* buffered changes */ 252 253 private object change_buffer 254 { 255 private var assignment: Boolean = false 256 private var nodes: Set[Document.Node.Name] = Set.empty 257 private var commands: Set[Command] = Set.empty 258 259 def flush(): Unit = synchronized { 260 if (assignment || nodes.nonEmpty || commands.nonEmpty) 261 commands_changed.post(Session.Commands_Changed(assignment, nodes, commands)) 262 if (nodes.nonEmpty) consolidation.update(nodes) 263 assignment = false 264 nodes = Set.empty 265 commands = Set.empty 266 } 267 private val delay_flush = Standard_Thread.delay_first(output_delay) { flush() } 268 269 def invoke(assign: Boolean, cmds: List[Command]): Unit = synchronized { 270 assignment |= assign 271 for (command <- cmds) { 272 nodes += command.node_name 273 command.blobs_names.foreach(nodes += _) 274 commands += command 275 } 276 delay_flush.invoke() 277 } 278 279 def shutdown() 280 { 281 delay_flush.revoke() 282 flush() 283 } 284 } 285 286 287 /* postponed changes */ 288 289 private object postponed_changes 290 { 291 private var postponed: List[Session.Change] = Nil 292 293 def store(change: Session.Change): Unit = synchronized { postponed ::= change } 294 295 def flush(state: Document.State): List[Session.Change] = synchronized { 296 val (assigned, unassigned) = postponed.partition(change => state.is_assigned(change.previous)) 297 postponed = unassigned 298 assigned.reverse 299 } 300 } 301 302 303 /* node consolidation */ 304 305 private object consolidation 306 { 307 private val delay = 308 Standard_Thread.delay_first(consolidate_delay) { manager.send(Consolidate_Execution) } 309 310 private val changed_nodes = Synchronized(Set.empty[Document.Node.Name]) 311 312 def update(new_nodes: Set[Document.Node.Name] = Set.empty) 313 { 314 changed_nodes.change(nodes => if (nodes.isEmpty) new_nodes else nodes ++ new_nodes) 315 delay.invoke() 316 } 317 318 def flush(): Set[Document.Node.Name] = 319 changed_nodes.change_result(nodes => (nodes, Set.empty)) 320 321 def revoke() { delay.revoke() } 322 } 323 324 325 /* prover process */ 326 327 private object prover 328 { 329 private val variable = Synchronized[Option[Prover]](None) 330 331 def defined: Boolean = variable.value.isDefined 332 def get: Prover = variable.value.get 333 def set(p: Prover) { variable.change(_ => Some(p)) } 334 def reset { variable.change(_ => None) } 335 def await_reset() { variable.guarded_access({ case None => Some((), None) case _ => None }) } 336 } 337 338 339 /* protocol handlers */ 340 341 private val protocol_handlers = Protocol_Handlers.init(session) 342 343 def get_protocol_handler(name: String): Option[Session.Protocol_Handler] = 344 protocol_handlers.get(name) 345 346 def init_protocol_handler(handler: Session.Protocol_Handler): Unit = 347 protocol_handlers.init(handler) 348 349 def init_protocol_handler(name: String): Unit = 350 protocol_handlers.init(name) 351 352 353 /* debugger */ 354 355 private val debugger_handler = new Debugger.Handler(this) 356 init_protocol_handler(debugger_handler) 357 358 def debugger: Debugger = debugger_handler.debugger 359 360 361 /* manager thread */ 362 363 private val delay_prune = 364 Standard_Thread.delay_first(prune_delay) { manager.send(Prune_History) } 365 366 private val manager: Consumer_Thread[Any] = 367 { 368 /* raw edits */ 369 370 def handle_raw_edits( 371 doc_blobs: Document.Blobs = Document.Blobs.empty, 372 edits: List[Document.Edit_Text] = Nil, 373 consolidate: List[Document.Node.Name] = Nil) 374 //{{{ 375 { 376 require(prover.defined) 377 378 prover.get.discontinue_execution() 379 380 val previous = global_state.value.history.tip.version 381 val version = Future.promise[Document.Version] 382 global_state.change(_.continue_history(previous, edits, version)) 383 384 raw_edits.post(Session.Raw_Edits(doc_blobs, edits)) 385 change_parser.send(Text_Edits(previous, doc_blobs, edits, consolidate, version)) 386 } 387 //}}} 388 389 390 /* resulting changes */ 391 392 def handle_change(change: Session.Change) 393 //{{{ 394 { 395 require(prover.defined) 396 397 def id_command(command: Command) 398 { 399 for { 400 (name, digest) <- command.blobs_defined 401 if !global_state.value.defined_blob(digest) 402 } { 403 change.version.nodes(name).get_blob match { 404 case Some(blob) => 405 global_state.change(_.define_blob(digest)) 406 prover.get.define_blob(digest, blob.bytes) 407 case None => 408 Output.error_message("Missing blob " + quote(name.toString)) 409 } 410 } 411 412 if (!global_state.value.defined_command(command.id)) { 413 global_state.change(_.define_command(command)) 414 prover.get.define_command(command) 415 } 416 } 417 for { (_, edit) <- change.doc_edits } { 418 edit.foreach({ case (c1, c2) => c1.foreach(id_command); c2.foreach(id_command) }) 419 } 420 421 val assignment = global_state.value.the_assignment(change.previous).check_finished 422 global_state.change(_.define_version(change.version, assignment)) 423 prover.get.update(change.previous.id, change.version.id, change.doc_edits, change.consolidate) 424 resources.commit(change) 425 } 426 //}}} 427 428 429 /* prover output */ 430 431 def handle_output(output: Prover.Output) 432 //{{{ 433 { 434 def bad_output() 435 { 436 if (verbose) 437 Output.warning("Ignoring bad prover output: " + output.message.toString) 438 } 439 440 def change_command(f: Document.State => (Command.State, Document.State)) 441 { 442 try { 443 val st = global_state.change_result(f) 444 change_buffer.invoke(false, List(st.command)) 445 } 446 catch { case _: Document.State.Fail => bad_output() } 447 } 448 449 output match { 450 case msg: Prover.Protocol_Output => 451 val handled = protocol_handlers.invoke(msg) 452 if (!handled) { 453 msg.properties match { 454 case Markup.Protocol_Handler(name) if prover.defined => 455 init_protocol_handler(name) 456 457 case Protocol.Command_Timing(state_id, timing) if prover.defined => 458 val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil))) 459 change_command(_.accumulate(state_id, xml_cache.elem(message), xml_cache)) 460 461 case Protocol.Theory_Timing(_, _) => 462 // FIXME 463 464 case Markup.Export(args) 465 if args.id.isDefined && Value.Long.unapply(args.id.get).isDefined => 466 val id = Value.Long.unapply(args.id.get).get 467 val export = Export.make_entry("", args, msg.bytes, cache = xz_cache) 468 change_command(_.add_export(id, (args.serial, export))) 469 470 case Markup.Assign_Update => 471 msg.text match { 472 case Protocol.Assign_Update(id, update) => 473 try { 474 val cmds = global_state.change_result(_.assign(id, update)) 475 change_buffer.invoke(true, cmds) 476 manager.send(Session.Change_Flush) 477 } 478 catch { case _: Document.State.Fail => bad_output() } 479 case _ => bad_output() 480 } 481 delay_prune.invoke() 482 483 case Markup.Removed_Versions => 484 msg.text match { 485 case Protocol.Removed(removed) => 486 try { 487 global_state.change(_.removed_versions(removed)) 488 manager.send(Session.Change_Flush) 489 } 490 catch { case _: Document.State.Fail => bad_output() } 491 case _ => bad_output() 492 } 493 494 case Markup.ML_Statistics(props) => 495 statistics.post(Session.Statistics(props)) 496 497 case Markup.Task_Statistics(props) => 498 // FIXME 499 500 case _ => bad_output() 501 } 502 } 503 case _ => 504 output.properties match { 505 case Position.Id(state_id) => 506 change_command(_.accumulate(state_id, output.message, xml_cache)) 507 508 case _ if output.is_init => 509 prover.get.options(session_options) 510 prover.get.session_base(resources) 511 phase = Session.Ready 512 debugger.ready() 513 514 case Markup.Process_Result(result) if output.is_exit => 515 phase = Session.Terminated(result) 516 prover.reset 517 518 case _ => 519 raw_output_messages.post(output) 520 } 521 } 522 } 523 //}}} 524 525 526 /* main thread */ 527 528 Consumer_Thread.fork[Any]("Session.manager", daemon = true) 529 { 530 case arg: Any => 531 //{{{ 532 arg match { 533 case output: Prover.Output => 534 if (output.is_stdout || output.is_stderr) 535 raw_output_messages.post(output) 536 else handle_output(output) 537 538 if (output.is_syslog) { 539 syslog += output.message 540 syslog_messages.post(output) 541 } 542 543 all_messages.post(output) 544 545 case input: Prover.Input => 546 all_messages.post(input) 547 548 case Start(start_prover) if !prover.defined => 549 prover.set(start_prover(manager.send(_))) 550 551 case Stop => 552 consolidation.revoke() 553 delay_prune.revoke() 554 if (prover.defined) { 555 protocol_handlers.exit() 556 global_state.change(_ => Document.State.init) 557 prover.get.terminate 558 } 559 560 case Consolidate_Execution => 561 if (prover.defined) { 562 val state = global_state.value 563 state.stable_tip_version match { 564 case None => consolidation.update() 565 case Some(version) => 566 val consolidate = 567 consolidation.flush().iterator.filter(name => 568 !resources.session_base.loaded_theory(name) && 569 !state.node_consolidated(version, name) && 570 state.node_maybe_consolidated(version, name)).toList 571 if (consolidate.nonEmpty) handle_raw_edits(consolidate = consolidate) 572 } 573 } 574 575 case Prune_History => 576 if (prover.defined) { 577 val old_versions = global_state.change_result(_.remove_versions(prune_size)) 578 if (old_versions.nonEmpty) prover.get.remove_versions(old_versions) 579 } 580 581 case Update_Options(options) => 582 if (prover.defined && is_ready) { 583 prover.get.options(options) 584 handle_raw_edits() 585 } 586 global_options.post(Session.Global_Options(options)) 587 588 case Cancel_Exec(exec_id) if prover.defined => 589 prover.get.cancel_exec(exec_id) 590 591 case Session.Raw_Edits(doc_blobs, edits) if prover.defined => 592 handle_raw_edits(doc_blobs = doc_blobs, edits = edits) 593 594 case Session.Dialog_Result(id, serial, result) if prover.defined => 595 prover.get.dialog_result(serial, result) 596 handle_output(new Prover.Output(Protocol.Dialog_Result(id, serial, result))) 597 598 case Protocol_Command(name, args) if prover.defined => 599 prover.get.protocol_command(name, args:_*) 600 601 case change: Session.Change if prover.defined => 602 val state = global_state.value 603 if (!state.removing_versions && state.is_assigned(change.previous)) 604 handle_change(change) 605 else postponed_changes.store(change) 606 607 case Session.Change_Flush if prover.defined => 608 val state = global_state.value 609 if (!state.removing_versions) 610 postponed_changes.flush(state).foreach(handle_change(_)) 611 612 case bad => 613 if (verbose) Output.warning("Ignoring bad message: " + bad.toString) 614 } 615 true 616 //}}} 617 } 618 } 619 620 621 /* main operations */ 622 623 def snapshot(name: Document.Node.Name = Document.Node.Name.empty, 624 pending_edits: List[Text.Edit] = Nil): Document.Snapshot = 625 global_state.value.snapshot(name, pending_edits) 626 627 def start(start_prover: Prover.Receiver => Prover) 628 { 629 _phase.change( 630 { 631 case Session.Inactive => 632 manager.send(Start(start_prover)) 633 post_phase(Session.Startup) 634 case phase => error("Cannot start prover in phase " + quote(phase.print)) 635 }) 636 } 637 638 def send_stop() 639 { 640 val was_ready = 641 _phase.guarded_access(phase => 642 phase match { 643 case Session.Startup | Session.Shutdown => None 644 case Session.Terminated(_) => Some((false, phase)) 645 case Session.Inactive => Some((false, post_phase(Session.Terminated(Process_Result(0))))) 646 case Session.Ready => Some((true, post_phase(Session.Shutdown))) 647 }) 648 if (was_ready) manager.send(Stop) 649 } 650 651 def stop(): Process_Result = 652 { 653 send_stop() 654 prover.await_reset() 655 656 change_parser.shutdown() 657 change_buffer.shutdown() 658 manager.shutdown() 659 dispatcher.shutdown() 660 661 phase match { 662 case Session.Terminated(result) => result 663 case phase => error("Bad session phase after shutdown: " + quote(phase.print)) 664 } 665 } 666 667 def protocol_command(name: String, args: String*) 668 { manager.send(Protocol_Command(name, args.toList)) } 669 670 def cancel_exec(exec_id: Document_ID.Exec) 671 { manager.send(Cancel_Exec(exec_id)) } 672 673 def update(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text]) 674 { if (edits.nonEmpty) manager.send_wait(Session.Raw_Edits(doc_blobs, edits)) } 675 676 def update_options(options: Options) 677 { manager.send_wait(Update_Options(options)) } 678 679 def dialog_result(id: Document_ID.Generic, serial: Long, result: String) 680 { manager.send(Session.Dialog_Result(id, serial, result)) } 681} 682