1/*  Title:      Pure/Concurrent/par_list.scala
2    Author:     Makarius
3
4Parallel list combinators.
5*/
6
7
8package isabelle
9
10
11object Par_List
12{
13  def managed_results[A, B](f: A => B, xs: List[A]): List[Exn.Result[B]] =
14    if (xs.isEmpty || xs.tail.isEmpty) xs.map(x => Exn.capture { f(x) })
15    else {
16      val state = Synchronized[(List[Future[B]], Boolean)]((Nil, false))
17
18      def cancel_other(self: Int = -1): Unit =
19        state.change { case (futures, canceled) =>
20          if (!canceled) {
21            for ((future, i) <- futures.iterator.zipWithIndex if i != self)
22              future.cancel
23          }
24          (futures, true)
25        }
26
27      try {
28        state.change(_ =>
29          (xs.iterator.zipWithIndex.map({ case (x, self) =>
30            Future.fork {
31              Exn.capture { f(x) } match {
32                case Exn.Exn(exn) => cancel_other(self); throw exn
33                case Exn.Res(res) => res
34              }
35            }
36          }).toList, false))
37
38        state.value._1.map(_.join_result)
39      }
40      finally { cancel_other() }
41    }
42
43  def map[A, B](f: A => B, xs: List[A]): List[B] =
44    Exn.release_first(managed_results(f, xs))
45
46  def get_some[A, B](f: A => Option[B], xs: List[A]): Option[B] =
47  {
48    class Found(val res: B) extends Exception
49    val results =
50      managed_results(
51        (x: A) => f(x) match { case None => () case Some(y) => throw new Found(y) }, xs)
52    results.collectFirst({ case Exn.Exn(found: Found) => found.res }) match {
53      case None => Exn.release_first(results); None
54      case some => some
55    }
56  }
57
58  def find_some[A](P: A => Boolean, xs: List[A]): Option[A] =
59    get_some((x: A) => if (P(x)) Some(x) else None, xs)
60
61  def exists[A](P: A => Boolean, xs: List[A]): Boolean = find_some(P, xs).isDefined
62  def forall[A](P: A => Boolean, xs: List[A]): Boolean = !exists((x: A) => !P(x), xs)
63}
64