1/* Title: Pure/PIDE/session.scala 2 Author: Makarius 3 Options: :folding=explicit: 4 5PIDE editor session, potentially with running prover process. 6*/ 7 8package isabelle 9 10 11import scala.collection.immutable.Queue 12import scala.collection.mutable 13import scala.annotation.tailrec 14 15 16object Session 17{ 18 /* outlets */ 19 20 object Consumer 21 { 22 def apply[A](name: String)(consume: A => Unit): Consumer[A] = 23 new Consumer[A](name, consume) 24 } 25 final class Consumer[-A] private(val name: String, val consume: A => Unit) 26 27 class Outlet[A](dispatcher: Consumer_Thread[() => Unit]) 28 { 29 private val consumers = Synchronized[List[Consumer[A]]](Nil) 30 31 def += (c: Consumer[A]) { consumers.change(Library.update(c)) } 32 def -= (c: Consumer[A]) { consumers.change(Library.remove(c)) } 33 34 def post(a: A) 35 { 36 for (c <- consumers.value.iterator) { 37 dispatcher.send(() => 38 try { c.consume(a) } 39 catch { 40 case exn: Throwable => 41 Output.error_message("Consumer failed: " + quote(c.name) + "\n" + Exn.message(exn)) 42 }) 43 } 44 } 45 } 46 47 48 /* change */ 49 50 sealed case class Change( 51 previous: Document.Version, 52 syntax_changed: List[Document.Node.Name], 53 deps_changed: Boolean, 54 doc_edits: List[Document.Edit_Command], 55 consolidate: List[Document.Node.Name], 56 version: Document.Version) 57 58 case object Change_Flush 59 60 61 /* events */ 62 63 //{{{ 64 case class Statistics(props: Properties.T) 65 case class Global_Options(options: Options) 66 case object Caret_Focus 67 case class Raw_Edits(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text]) 68 case class Dialog_Result(id: Document_ID.Generic, serial: Long, result: String) 69 case class Build_Theories(id: String, master_dir: Path, theories: List[(Options, List[Path])]) 70 case class Commands_Changed( 71 assignment: Boolean, nodes: Set[Document.Node.Name], commands: Set[Command]) 72 73 sealed abstract class Phase 74 { 75 def print: String = 76 this match { 77 case Terminated(result) => if (result.ok) "finished" else "failed" 78 case _ => Word.lowercase(this.toString) 79 } 80 } 81 case object Inactive extends Phase // stable 82 case object Startup extends Phase // transient 83 case object Ready extends Phase // metastable 84 case object Shutdown extends Phase // transient 85 case class Terminated(result: Process_Result) extends Phase // stable 86 //}}} 87 88 89 /* syslog */ 90 91 private[Session] class Syslog(limit: Int) 92 { 93 private var queue = Queue.empty[XML.Elem] 94 private var length = 0 95 96 def += (msg: XML.Elem): Unit = synchronized { 97 queue = queue.enqueue(msg) 98 length += 1 99 if (length > limit) queue = queue.dequeue._2 100 } 101 102 def content: String = synchronized { 103 cat_lines(queue.iterator.map(XML.content)) + 104 (if (length > limit) "\n(A total of " + length + " messages...)" else "") 105 } 106 } 107 108 109 /* protocol handlers */ 110 111 abstract class Protocol_Handler 112 { 113 def init(session: Session): Unit = {} 114 def exit(): Unit = {} 115 val functions: List[(String, Prover.Protocol_Output => Boolean)] 116 } 117} 118 119 120class Session(_session_options: => Options, val resources: Resources) extends Document.Session 121{ 122 session => 123 124 val xml_cache: XML.Cache = XML.make_cache() 125 val xz_cache: XZ.Cache = XZ.make_cache() 126 127 128 /* global flags */ 129 130 @volatile var timing: Boolean = false 131 @volatile var verbose: Boolean = false 132 133 134 /* dynamic session options */ 135 136 def session_options: Options = _session_options 137 138 def output_delay: Time = session_options.seconds("editor_output_delay") 139 def consolidate_delay: Time = session_options.seconds("editor_consolidate_delay") 140 def prune_delay: Time = session_options.seconds("editor_prune_delay") 141 def prune_size: Int = session_options.int("editor_prune_size") 142 def syslog_limit: Int = session_options.int("editor_syslog_limit") 143 def reparse_limit: Int = session_options.int("editor_reparse_limit") 144 145 146 /* dispatcher */ 147 148 private val dispatcher = 149 Consumer_Thread.fork[() => Unit]("Session.dispatcher", daemon = true) { case e => e(); true } 150 151 def assert_dispatcher[A](body: => A): A = 152 { 153 assert(dispatcher.check_thread) 154 body 155 } 156 157 def require_dispatcher[A](body: => A): A = 158 { 159 require(dispatcher.check_thread) 160 body 161 } 162 163 def send_dispatcher(body: => Unit): Unit = 164 { 165 if (dispatcher.check_thread) body 166 else dispatcher.send(() => body) 167 } 168 169 def send_wait_dispatcher(body: => Unit): Unit = 170 { 171 if (dispatcher.check_thread) body 172 else dispatcher.send_wait(() => body) 173 } 174 175 176 /* outlets */ 177 178 val statistics = new Session.Outlet[Session.Statistics](dispatcher) 179 val global_options = new Session.Outlet[Session.Global_Options](dispatcher) 180 val caret_focus = new Session.Outlet[Session.Caret_Focus.type](dispatcher) 181 val raw_edits = new Session.Outlet[Session.Raw_Edits](dispatcher) 182 val commands_changed = new Session.Outlet[Session.Commands_Changed](dispatcher) 183 val phase_changed = new Session.Outlet[Session.Phase](dispatcher) 184 val syslog_messages = new Session.Outlet[Prover.Output](dispatcher) 185 val raw_output_messages = new Session.Outlet[Prover.Output](dispatcher) 186 val trace_events = new Session.Outlet[Simplifier_Trace.Event.type](dispatcher) 187 val debugger_updates = new Session.Outlet[Debugger.Update.type](dispatcher) 188 189 val all_messages = new Session.Outlet[Prover.Message](dispatcher) // potential bottle-neck! 190 191 192 /** main protocol manager **/ 193 194 /* internal messages */ 195 196 private case class Start(start_prover: Prover.Receiver => Prover) 197 private case object Stop 198 private case class Get_State(promise: Promise[Document.State]) 199 private case class Cancel_Exec(exec_id: Document_ID.Exec) 200 private case class Protocol_Command(name: String, args: List[String]) 201 private case class Update_Options(options: Options) 202 private case object Consolidate_Execution 203 private case object Prune_History 204 205 206 /* phase */ 207 208 private def post_phase(new_phase: Session.Phase): Session.Phase = 209 { 210 phase_changed.post(new_phase) 211 new_phase 212 } 213 private val _phase = Synchronized[Session.Phase](Session.Inactive) 214 private def phase_=(new_phase: Session.Phase): Unit = _phase.change(_ => post_phase(new_phase)) 215 216 def phase = _phase.value 217 def is_ready: Boolean = phase == Session.Ready 218 219 220 /* syslog */ 221 222 private val syslog = new Session.Syslog(syslog_limit) 223 def syslog_content(): String = syslog.content 224 225 226 /* pipelined change parsing */ 227 228 private case class Text_Edits( 229 previous: Future[Document.Version], 230 doc_blobs: Document.Blobs, 231 text_edits: List[Document.Edit_Text], 232 consolidate: List[Document.Node.Name], 233 version_result: Promise[Document.Version]) 234 235 private val change_parser = Consumer_Thread.fork[Text_Edits]("change_parser", daemon = true) 236 { 237 case Text_Edits(previous, doc_blobs, text_edits, consolidate, version_result) => 238 val prev = previous.get_finished 239 val change = 240 Timing.timeit("parse_change", timing) { 241 resources.parse_change(reparse_limit, prev, doc_blobs, text_edits, consolidate) 242 } 243 version_result.fulfill(change.version) 244 manager.send(change) 245 true 246 } 247 248 249 /* buffered changes */ 250 251 private object change_buffer 252 { 253 private var assignment: Boolean = false 254 private var nodes: Set[Document.Node.Name] = Set.empty 255 private var commands: Set[Command] = Set.empty 256 257 def flush(): Unit = synchronized { 258 if (assignment || nodes.nonEmpty || commands.nonEmpty) 259 commands_changed.post(Session.Commands_Changed(assignment, nodes, commands)) 260 if (nodes.nonEmpty) consolidation.update(nodes) 261 assignment = false 262 nodes = Set.empty 263 commands = Set.empty 264 } 265 private val delay_flush = Standard_Thread.delay_first(output_delay) { flush() } 266 267 def invoke(assign: Boolean, edited_nodes: List[Document.Node.Name], cmds: List[Command]): Unit = 268 synchronized { 269 assignment |= assign 270 for (node <- edited_nodes) { 271 nodes += node 272 } 273 for (command <- cmds) { 274 nodes += command.node_name 275 command.blobs_names.foreach(nodes += _) 276 commands += command 277 } 278 delay_flush.invoke() 279 } 280 281 def shutdown() 282 { 283 delay_flush.revoke() 284 flush() 285 } 286 } 287 288 289 /* postponed changes */ 290 291 private object postponed_changes 292 { 293 private var postponed: List[Session.Change] = Nil 294 295 def store(change: Session.Change): Unit = synchronized { postponed ::= change } 296 297 def flush(state: Document.State): List[Session.Change] = synchronized { 298 val (assigned, unassigned) = postponed.partition(change => state.is_assigned(change.previous)) 299 postponed = unassigned 300 assigned.reverse 301 } 302 } 303 304 305 /* node consolidation */ 306 307 private object consolidation 308 { 309 private val delay = 310 Standard_Thread.delay_first(consolidate_delay) { manager.send(Consolidate_Execution) } 311 312 private val init_state: Option[Set[Document.Node.Name]] = Some(Set.empty) 313 private val state = Synchronized(init_state) 314 315 def exit() 316 { 317 delay.revoke() 318 state.change(_ => None) 319 } 320 321 def update(new_nodes: Set[Document.Node.Name] = Set.empty) 322 { 323 val active = 324 state.change_result(st => 325 (st.isDefined, st.map(nodes => if (nodes.isEmpty) new_nodes else nodes ++ new_nodes))) 326 if (active) delay.invoke() 327 } 328 329 def flush(): Set[Document.Node.Name] = 330 state.change_result(st => if (st.isDefined) (st.get, init_state) else (Set.empty, None)) 331 } 332 333 334 /* prover process */ 335 336 private object prover 337 { 338 private val variable = Synchronized[Option[Prover]](None) 339 340 def defined: Boolean = variable.value.isDefined 341 def get: Prover = variable.value.get 342 def set(p: Prover) { variable.change(_ => Some(p)) } 343 def reset { variable.change(_ => None) } 344 def await_reset() { variable.guarded_access({ case None => Some((), None) case _ => None }) } 345 } 346 347 348 /* file formats */ 349 350 lazy val file_formats: File_Format.Session = 351 resources.file_formats.start_session(session) 352 353 354 /* protocol handlers */ 355 356 private val protocol_handlers = Protocol_Handlers.init(session) 357 358 def get_protocol_handler(name: String): Option[Session.Protocol_Handler] = 359 protocol_handlers.get(name) 360 361 def init_protocol_handler(handler: Session.Protocol_Handler): Unit = 362 protocol_handlers.init(handler) 363 364 def init_protocol_handler(name: String): Unit = 365 protocol_handlers.init(name) 366 367 368 /* debugger */ 369 370 private val debugger_handler = new Debugger.Handler(this) 371 init_protocol_handler(debugger_handler) 372 373 def debugger: Debugger = debugger_handler.debugger 374 375 376 /* manager thread */ 377 378 private val delay_prune = 379 Standard_Thread.delay_first(prune_delay) { manager.send(Prune_History) } 380 381 private val manager: Consumer_Thread[Any] = 382 { 383 /* global state */ 384 val global_state = Synchronized(Document.State.init) 385 386 387 /* raw edits */ 388 389 def handle_raw_edits( 390 doc_blobs: Document.Blobs = Document.Blobs.empty, 391 edits: List[Document.Edit_Text] = Nil, 392 consolidate: List[Document.Node.Name] = Nil) 393 //{{{ 394 { 395 require(prover.defined) 396 397 if (edits.nonEmpty) prover.get.discontinue_execution() 398 399 val previous = global_state.value.history.tip.version 400 val version = Future.promise[Document.Version] 401 global_state.change(_.continue_history(previous, edits, version)) 402 403 raw_edits.post(Session.Raw_Edits(doc_blobs, edits)) 404 change_parser.send(Text_Edits(previous, doc_blobs, edits, consolidate, version)) 405 } 406 //}}} 407 408 409 /* resulting changes */ 410 411 def handle_change(change: Session.Change) 412 //{{{ 413 { 414 require(prover.defined) 415 416 // define commands 417 { 418 val id_commands = new mutable.ListBuffer[Command] 419 def id_command(command: Command) 420 { 421 for { 422 (name, digest) <- command.blobs_defined 423 if !global_state.value.defined_blob(digest) 424 } { 425 change.version.nodes(name).get_blob match { 426 case Some(blob) => 427 global_state.change(_.define_blob(digest)) 428 prover.get.define_blob(digest, blob.bytes) 429 case None => 430 Output.error_message("Missing blob " + quote(name.toString)) 431 } 432 } 433 434 if (!global_state.value.defined_command(command.id)) { 435 global_state.change(_.define_command(command)) 436 id_commands += command 437 } 438 } 439 for { (_, edit) <- change.doc_edits } { 440 edit.foreach({ case (c1, c2) => c1.foreach(id_command); c2.foreach(id_command) }) 441 } 442 if (id_commands.nonEmpty) prover.get.define_commands_bulk(id_commands.toList) 443 } 444 445 val assignment = global_state.value.the_assignment(change.previous).check_finished 446 global_state.change(_.define_version(change.version, assignment)) 447 448 prover.get.update(change.previous.id, change.version.id, change.doc_edits, change.consolidate) 449 resources.commit(change) 450 } 451 //}}} 452 453 454 /* prover output */ 455 456 def handle_output(output: Prover.Output) 457 //{{{ 458 { 459 def bad_output() 460 { 461 if (verbose) 462 Output.warning("Ignoring bad prover output: " + output.message.toString) 463 } 464 465 def change_command(f: Document.State => (Command.State, Document.State)) 466 { 467 try { 468 val st = global_state.change_result(f) 469 change_buffer.invoke(false, Nil, List(st.command)) 470 } 471 catch { case _: Document.State.Fail => bad_output() } 472 } 473 474 output match { 475 case msg: Prover.Protocol_Output => 476 val handled = protocol_handlers.invoke(msg) 477 if (!handled) { 478 msg.properties match { 479 case Markup.Protocol_Handler(name) if prover.defined => 480 init_protocol_handler(name) 481 482 case Protocol.Command_Timing(state_id, timing) if prover.defined => 483 val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil))) 484 change_command(_.accumulate(state_id, xml_cache.elem(message), xml_cache)) 485 486 case Protocol.Theory_Timing(_, _) => 487 // FIXME 488 489 case Markup.Export(args) 490 if args.id.isDefined && Value.Long.unapply(args.id.get).isDefined => 491 val id = Value.Long.unapply(args.id.get).get 492 val export = Export.make_entry("", args, msg.bytes, cache = xz_cache) 493 change_command(_.add_export(id, (args.serial, export))) 494 495 case Markup.Commands_Accepted => 496 msg.text match { 497 case Protocol.Commands_Accepted(ids) => 498 ids.foreach(id => 499 change_command(_.accumulate(id, Protocol.Commands_Accepted.message, xml_cache))) 500 case _ => bad_output() 501 } 502 503 case Markup.Assign_Update => 504 msg.text match { 505 case Protocol.Assign_Update(id, edited, update) => 506 try { 507 val (edited_nodes, cmds) = 508 global_state.change_result(_.assign(id, edited, update)) 509 change_buffer.invoke(true, edited_nodes, cmds) 510 manager.send(Session.Change_Flush) 511 } 512 catch { case _: Document.State.Fail => bad_output() } 513 case _ => bad_output() 514 } 515 delay_prune.invoke() 516 517 case Markup.Removed_Versions => 518 msg.text match { 519 case Protocol.Removed(removed) => 520 try { 521 global_state.change(_.removed_versions(removed)) 522 manager.send(Session.Change_Flush) 523 } 524 catch { case _: Document.State.Fail => bad_output() } 525 case _ => bad_output() 526 } 527 528 case Markup.ML_Statistics(props) => 529 statistics.post(Session.Statistics(props)) 530 531 case Markup.Task_Statistics(props) => 532 // FIXME 533 534 case _ => bad_output() 535 } 536 } 537 case _ => 538 output.properties match { 539 case Position.Id(state_id) => 540 change_command(_.accumulate(state_id, output.message, xml_cache)) 541 542 case _ if output.is_init => 543 prover.get.options(file_formats.prover_options(session_options)) 544 prover.get.session_base(resources) 545 phase = Session.Ready 546 debugger.ready() 547 548 case Markup.Process_Result(result) if output.is_exit => 549 file_formats.stop_session 550 phase = Session.Terminated(result) 551 prover.reset 552 553 case _ => 554 raw_output_messages.post(output) 555 } 556 } 557 } 558 //}}} 559 560 561 /* main thread */ 562 563 Consumer_Thread.fork[Any]("Session.manager", daemon = true) 564 { 565 case arg: Any => 566 //{{{ 567 arg match { 568 case output: Prover.Output => 569 if (output.is_stdout || output.is_stderr) 570 raw_output_messages.post(output) 571 else handle_output(output) 572 573 if (output.is_syslog) { 574 syslog += output.message 575 syslog_messages.post(output) 576 } 577 578 all_messages.post(output) 579 580 case input: Prover.Input => 581 all_messages.post(input) 582 583 case Start(start_prover) if !prover.defined => 584 prover.set(start_prover(manager.send(_))) 585 586 case Stop => 587 consolidation.exit() 588 delay_prune.revoke() 589 if (prover.defined) { 590 protocol_handlers.exit() 591 global_state.change(_ => Document.State.init) 592 prover.get.terminate 593 } 594 595 case Get_State(promise) => 596 promise.fulfill(global_state.value) 597 598 case Consolidate_Execution => 599 if (prover.defined) { 600 val state = global_state.value 601 state.stable_tip_version match { 602 case None => consolidation.update() 603 case Some(version) => 604 val consolidate = 605 consolidation.flush().iterator.filter(name => 606 !resources.session_base.loaded_theory(name) && 607 !state.node_consolidated(version, name) && 608 state.node_maybe_consolidated(version, name)).toList 609 if (consolidate.nonEmpty) handle_raw_edits(consolidate = consolidate) 610 } 611 } 612 613 case Prune_History => 614 if (prover.defined) { 615 val old_versions = global_state.change_result(_.remove_versions(prune_size)) 616 if (old_versions.nonEmpty) prover.get.remove_versions(old_versions) 617 } 618 619 case Update_Options(options) => 620 if (prover.defined && is_ready) { 621 prover.get.options(file_formats.prover_options(options)) 622 handle_raw_edits() 623 } 624 global_options.post(Session.Global_Options(options)) 625 626 case Cancel_Exec(exec_id) if prover.defined => 627 prover.get.cancel_exec(exec_id) 628 629 case Session.Raw_Edits(doc_blobs, edits) if prover.defined => 630 handle_raw_edits(doc_blobs = doc_blobs, edits = edits) 631 632 case Session.Dialog_Result(id, serial, result) if prover.defined => 633 prover.get.dialog_result(serial, result) 634 handle_output(new Prover.Output(Protocol.Dialog_Result(id, serial, result))) 635 636 case Protocol_Command(name, args) if prover.defined => 637 prover.get.protocol_command_args(name, args) 638 639 case change: Session.Change if prover.defined => 640 val state = global_state.value 641 if (!state.removing_versions && state.is_assigned(change.previous)) 642 handle_change(change) 643 else postponed_changes.store(change) 644 645 case Session.Change_Flush if prover.defined => 646 val state = global_state.value 647 if (!state.removing_versions) 648 postponed_changes.flush(state).foreach(handle_change(_)) 649 650 case bad => 651 if (verbose) Output.warning("Ignoring bad message: " + bad.toString) 652 } 653 true 654 //}}} 655 } 656 } 657 658 659 /* main operations */ 660 661 def get_state(): Document.State = 662 { 663 if (manager.is_active) { 664 val promise = Future.promise[Document.State] 665 manager.send_wait(Get_State(promise)) 666 promise.join 667 } 668 else Document.State.init 669 } 670 671 def snapshot(name: Document.Node.Name = Document.Node.Name.empty, 672 pending_edits: List[Text.Edit] = Nil): Document.Snapshot = 673 get_state().snapshot(name, pending_edits) 674 675 def recent_syntax(name: Document.Node.Name): Outer_Syntax = 676 get_state().recent_finished.version.get_finished.nodes(name).syntax getOrElse 677 resources.session_base.overall_syntax 678 679 @tailrec final def await_stable_snapshot(): Document.Snapshot = 680 { 681 val snapshot = this.snapshot() 682 if (snapshot.is_outdated) { 683 Thread.sleep(output_delay.ms) 684 await_stable_snapshot() 685 } 686 else snapshot 687 } 688 689 def start(start_prover: Prover.Receiver => Prover) 690 { 691 file_formats 692 _phase.change( 693 { 694 case Session.Inactive => 695 manager.send(Start(start_prover)) 696 post_phase(Session.Startup) 697 case phase => error("Cannot start prover in phase " + quote(phase.print)) 698 }) 699 } 700 701 def send_stop() 702 { 703 val was_ready = 704 _phase.guarded_access(phase => 705 phase match { 706 case Session.Startup | Session.Shutdown => None 707 case Session.Terminated(_) => Some((false, phase)) 708 case Session.Inactive => Some((false, post_phase(Session.Terminated(Process_Result(0))))) 709 case Session.Ready => Some((true, post_phase(Session.Shutdown))) 710 }) 711 if (was_ready) manager.send(Stop) 712 } 713 714 def stop(): Process_Result = 715 { 716 send_stop() 717 prover.await_reset() 718 719 change_parser.shutdown() 720 change_buffer.shutdown() 721 manager.shutdown() 722 dispatcher.shutdown() 723 724 phase match { 725 case Session.Terminated(result) => result 726 case phase => error("Bad session phase after shutdown: " + quote(phase.print)) 727 } 728 } 729 730 def protocol_command(name: String, args: String*) 731 { manager.send(Protocol_Command(name, args.toList)) } 732 733 def cancel_exec(exec_id: Document_ID.Exec) 734 { manager.send(Cancel_Exec(exec_id)) } 735 736 def update(doc_blobs: Document.Blobs, edits: List[Document.Edit_Text]) 737 { 738 if (edits.nonEmpty) manager.send_wait(Session.Raw_Edits(doc_blobs, edits)) 739 } 740 741 def update_options(options: Options) 742 { manager.send_wait(Update_Options(options)) } 743 744 def dialog_result(id: Document_ID.Generic, serial: Long, result: String) 745 { manager.send(Session.Dialog_Result(id, serial, result)) } 746} 747