1/*  Title:      Pure/Concurrent/consumer_thread.scala
2    Author:     Makarius
3
4Consumer thread with unbounded queueing of requests, and optional
5acknowledgment.
6*/
7
8package isabelle
9
10
11import scala.annotation.tailrec
12
13
14object Consumer_Thread
15{
16  def fork_bulk[A](name: String = "", daemon: Boolean = false)(
17      bulk: A => Boolean,
18      consume: List[A] => (List[Exn.Result[Unit]], Boolean),
19      finish: () => Unit = () => ()): Consumer_Thread[A] =
20    new Consumer_Thread[A](name, daemon, bulk, consume, finish)
21
22  def fork[A](name: String = "", daemon: Boolean = false)(
23      consume: A => Boolean,
24      finish: () => Unit = () => ()): Consumer_Thread[A] =
25  {
26    def consume_single(args: List[A]): (List[Exn.Result[Unit]], Boolean) =
27    {
28      assert(args.length == 1)
29      Exn.capture { consume(args.head) } match {
30        case Exn.Res(continue) => (List(Exn.Res(())), continue)
31        case Exn.Exn(exn) => (List(Exn.Exn(exn)), true)
32      }
33    }
34
35    fork_bulk(name = name, daemon = daemon)(_ => false, consume_single, finish = finish)
36  }
37}
38
39final class Consumer_Thread[A] private(
40  name: String, daemon: Boolean,
41  bulk: A => Boolean,
42  consume: List[A] => (List[Exn.Result[Unit]], Boolean),
43  finish: () => Unit)
44{
45  /* thread */
46
47  private var active = true
48  private val mailbox = Mailbox[Option[Request]]
49
50  private val thread = Standard_Thread.fork(name, daemon) { main_loop(Nil) }
51  def is_active: Boolean = active && thread.isAlive
52  def check_thread: Boolean = Thread.currentThread == thread
53
54  private def failure(exn: Throwable): Unit =
55    Output.error_message(
56      "Consumer thread failure: " + quote(thread.getName) + "\n" + Exn.message(exn))
57
58  private def robust_finish(): Unit =
59    try { finish() } catch { case exn: Throwable => failure(exn) }
60
61
62  /* requests */
63
64  private class Request(val arg: A, acknowledge: Boolean = false)
65  {
66    val ack: Option[Synchronized[Option[Exn.Result[Unit]]]] =
67      if (acknowledge) Some(Synchronized(None)) else None
68
69    def await
70    {
71      for (a <- ack) {
72        Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) }))
73      }
74    }
75  }
76
77  private def request(req: Request)
78  {
79    synchronized {
80      if (is_active) mailbox.send(Some(req))
81      else error("Consumer thread not active: " + quote(thread.getName))
82    }
83    req.await
84  }
85
86  @tailrec private def main_loop(msgs: List[Option[Request]]): Unit =
87    msgs match {
88      case Nil => main_loop(mailbox.receive())
89      case None :: _ => robust_finish()
90      case _ =>
91        val reqs =
92          proper_list(msgs.takeWhile(msg => msg.isDefined && bulk(msg.get.arg)))
93            .getOrElse(msgs.take(1))
94            .map(_.get)
95
96        val (results, continue) = consume(reqs.map(_.arg))
97
98        for { (Some(req), Some(res)) <- reqs.map(Some(_)).zipAll(results.map(Some(_)), None, None) }
99        {
100          (req.ack, res) match {
101            case ((Some(a), _)) => a.change(_ => Some(res))
102            case ((None, Exn.Res(_))) =>
103            case ((None, Exn.Exn(exn))) => failure(exn)
104          }
105        }
106
107        if (continue) main_loop(msgs.drop(reqs.length))
108        else robust_finish()
109    }
110
111
112  /* main methods */
113
114  assert(is_active)
115
116  def send(arg: A) { request(new Request(arg)) }
117  def send_wait(arg: A) { request(new Request(arg, acknowledge = true)) }
118
119  def shutdown()
120  {
121    synchronized { if (is_active) { active = false; mailbox.send(None) } }
122    thread.join
123  }
124}
125