1/* Title: Pure/Concurrent/future.scala 2 Author: Makarius 3 4Value-oriented parallel execution via futures and promises. 5*/ 6 7package isabelle 8 9 10import java.util.concurrent.Callable 11 12 13/* futures and promises */ 14 15object Future 16{ 17 def value[A](x: A): Future[A] = new Value_Future(x) 18 def fork[A](body: => A): Future[A] = new Task_Future[A](body) 19 def promise[A]: Promise[A] = new Promise_Future[A] 20 def thread[A](name: String = "", daemon: Boolean = false)(body: => A): Future[A] = 21 new Thread_Future[A](name, daemon, body) 22} 23 24trait Future[A] 25{ 26 def peek: Option[Exn.Result[A]] 27 def is_finished: Boolean = peek.isDefined 28 def get_finished: A = { require(is_finished); Exn.release(peek.get) } 29 def join_result: Exn.Result[A] 30 def join: A = Exn.release(join_result) 31 def map[B](f: A => B): Future[B] = Future.fork { f(join) } 32 def cancel: Unit 33 34 override def toString: String = 35 peek match { 36 case None => "<future>" 37 case Some(Exn.Exn(_)) => "<failed>" 38 case Some(Exn.Res(x)) => x.toString 39 } 40} 41 42trait Promise[A] extends Future[A] 43{ 44 def fulfill_result(res: Exn.Result[A]): Unit 45 def fulfill(x: A): Unit 46} 47 48 49/* value future */ 50 51private class Value_Future[A](x: A) extends Future[A] 52{ 53 val peek: Option[Exn.Result[A]] = Some(Exn.Res(x)) 54 def join_result: Exn.Result[A] = peek.get 55 def cancel {} 56} 57 58 59/* task future via thread pool */ 60 61private class Task_Future[A](body: => A) extends Future[A] 62{ 63 private sealed abstract class Status 64 private case object Ready extends Status 65 private case class Running(thread: Thread) extends Status 66 private case object Terminated extends Status 67 private case class Finished(result: Exn.Result[A]) extends Status 68 69 private val status = Synchronized[Status](Ready) 70 71 def peek: Option[Exn.Result[A]] = 72 status.value match { 73 case Finished(result) => Some(result) 74 case _ => None 75 } 76 77 private def try_run() 78 { 79 val do_run = 80 status.change_result { 81 case Ready => (true, Running(Thread.currentThread)) 82 case st => (false, st) 83 } 84 if (do_run) { 85 val result = Exn.capture(body) 86 status.change(_ => Terminated) 87 status.change(_ => Finished(if (Thread.interrupted) Exn.Exn(Exn.Interrupt()) else result)) 88 } 89 } 90 private val task = Standard_Thread.pool.submit(new Callable[Unit] { def call = try_run() }) 91 92 def join_result: Exn.Result[A] = 93 { 94 try_run() 95 status.guarded_access { 96 case st @ Finished(result) => Some((result, st)) 97 case _ => None 98 } 99 } 100 101 def cancel = 102 { 103 status.change { 104 case Ready => task.cancel(false); Finished(Exn.Exn(Exn.Interrupt())) 105 case st @ Running(thread) => thread.interrupt; st 106 case st => st 107 } 108 } 109} 110 111 112/* promise future */ 113 114private class Promise_Future[A] extends Promise[A] 115{ 116 private val state = Synchronized[Option[Exn.Result[A]]](None) 117 def peek: Option[Exn.Result[A]] = state.value 118 119 def join_result: Exn.Result[A] = 120 state.guarded_access(st => if (st.isEmpty) None else Some((st.get, st))) 121 122 def fulfill_result(result: Exn.Result[A]): Unit = 123 state.change(st => if (st.isEmpty) Some(result) else throw new IllegalStateException) 124 125 def fulfill(x: A): Unit = fulfill_result(Exn.Res(x)) 126 127 def cancel: Unit = 128 state.change(st => if (st.isEmpty) Some(Exn.Exn(Exn.Interrupt())) else st) 129} 130 131 132/* thread future */ 133 134private class Thread_Future[A](name: String, daemon: Boolean, body: => A) extends Future[A] 135{ 136 private val result = Future.promise[A] 137 private val thread = 138 Standard_Thread.fork(name, daemon) { result.fulfill_result(Exn.capture(body)) } 139 140 def peek: Option[Exn.Result[A]] = result.peek 141 def join_result: Exn.Result[A] = result.join_result 142 def cancel: Unit = thread.interrupt 143} 144