1336809Sdim//===-- llvm/Support/TaskQueue.h - A TaskQueue implementation ---*- C++ -*-===//
2336809Sdim//
3353358Sdim// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4353358Sdim// See https://llvm.org/LICENSE.txt for license information.
5353358Sdim// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6336809Sdim//
7336809Sdim//===----------------------------------------------------------------------===//
8336809Sdim//
9336809Sdim// This file defines a crude C++11 based task queue.
10336809Sdim//
11336809Sdim//===----------------------------------------------------------------------===//
12336809Sdim
13336809Sdim#ifndef LLVM_SUPPORT_TASK_QUEUE_H
14336809Sdim#define LLVM_SUPPORT_TASK_QUEUE_H
15336809Sdim
16336809Sdim#include "llvm/Config/llvm-config.h"
17336809Sdim#include "llvm/Support/ThreadPool.h"
18336809Sdim#include "llvm/Support/thread.h"
19336809Sdim
20336809Sdim#include <atomic>
21336809Sdim#include <cassert>
22336809Sdim#include <condition_variable>
23336809Sdim#include <deque>
24336809Sdim#include <functional>
25336809Sdim#include <future>
26336809Sdim#include <memory>
27336809Sdim#include <mutex>
28336809Sdim#include <utility>
29336809Sdim
30336809Sdimnamespace llvm {
31336809Sdim/// TaskQueue executes serialized work on a user-defined Thread Pool.  It
32336809Sdim/// guarantees that if task B is enqueued after task A, task B begins after
33336809Sdim/// task A completes and there is no overlap between the two.
34336809Sdimclass TaskQueue {
35336809Sdim  // Because we don't have init capture to use move-only local variables that
36336809Sdim  // are captured into a lambda, we create the promise inside an explicit
37336809Sdim  // callable struct. We want to do as much of the wrapping in the
38336809Sdim  // type-specialized domain (before type erasure) and then erase this into a
39336809Sdim  // std::function.
40336809Sdim  template <typename Callable> struct Task {
41336809Sdim    using ResultTy = typename std::result_of<Callable()>::type;
42336809Sdim    explicit Task(Callable C, TaskQueue &Parent)
43336809Sdim        : C(std::move(C)), P(std::make_shared<std::promise<ResultTy>>()),
44336809Sdim          Parent(&Parent) {}
45336809Sdim
46336809Sdim    template<typename T>
47336809Sdim    void invokeCallbackAndSetPromise(T*) {
48336809Sdim      P->set_value(C());
49336809Sdim    }
50336809Sdim
51336809Sdim    void invokeCallbackAndSetPromise(void*) {
52336809Sdim      C();
53336809Sdim      P->set_value();
54336809Sdim    }
55336809Sdim
56336809Sdim    void operator()() noexcept {
57336809Sdim      ResultTy *Dummy = nullptr;
58336809Sdim      invokeCallbackAndSetPromise(Dummy);
59336809Sdim      Parent->completeTask();
60336809Sdim    }
61336809Sdim
62336809Sdim    Callable C;
63336809Sdim    std::shared_ptr<std::promise<ResultTy>> P;
64336809Sdim    TaskQueue *Parent;
65336809Sdim  };
66336809Sdim
67336809Sdimpublic:
68336809Sdim  /// Construct a task queue with no work.
69336809Sdim  TaskQueue(ThreadPool &Scheduler) : Scheduler(Scheduler) { (void)Scheduler; }
70336809Sdim
71336809Sdim  /// Blocking destructor: the queue will wait for all work to complete.
72336809Sdim  ~TaskQueue() {
73336809Sdim    Scheduler.wait();
74336809Sdim    assert(Tasks.empty());
75336809Sdim  }
76336809Sdim
77336809Sdim  /// Asynchronous submission of a task to the queue. The returned future can be
78336809Sdim  /// used to wait for the task (and all previous tasks that have not yet
79336809Sdim  /// completed) to finish.
80336809Sdim  template <typename Callable>
81336809Sdim  std::future<typename std::result_of<Callable()>::type> async(Callable &&C) {
82336809Sdim#if !LLVM_ENABLE_THREADS
83336809Sdim    static_assert(false,
84336809Sdim                  "TaskQueue requires building with LLVM_ENABLE_THREADS!");
85336809Sdim#endif
86336809Sdim    Task<Callable> T{std::move(C), *this};
87336809Sdim    using ResultTy = typename std::result_of<Callable()>::type;
88336809Sdim    std::future<ResultTy> F = T.P->get_future();
89336809Sdim    {
90336809Sdim      std::lock_guard<std::mutex> Lock(QueueLock);
91336809Sdim      // If there's already a task in flight, just queue this one up.  If
92336809Sdim      // there is not a task in flight, bypass the queue and schedule this
93336809Sdim      // task immediately.
94336809Sdim      if (IsTaskInFlight)
95336809Sdim        Tasks.push_back(std::move(T));
96336809Sdim      else {
97336809Sdim        Scheduler.async(std::move(T));
98336809Sdim        IsTaskInFlight = true;
99336809Sdim      }
100336809Sdim    }
101336809Sdim    return std::move(F);
102336809Sdim  }
103336809Sdim
104336809Sdimprivate:
105336809Sdim  void completeTask() {
106336809Sdim    // We just completed a task.  If there are no more tasks in the queue,
107336809Sdim    // update IsTaskInFlight to false and stop doing work.  Otherwise
108336809Sdim    // schedule the next task (while not holding the lock).
109336809Sdim    std::function<void()> Continuation;
110336809Sdim    {
111336809Sdim      std::lock_guard<std::mutex> Lock(QueueLock);
112336809Sdim      if (Tasks.empty()) {
113336809Sdim        IsTaskInFlight = false;
114336809Sdim        return;
115336809Sdim      }
116336809Sdim
117336809Sdim      Continuation = std::move(Tasks.front());
118336809Sdim      Tasks.pop_front();
119336809Sdim    }
120336809Sdim    Scheduler.async(std::move(Continuation));
121336809Sdim  }
122336809Sdim
123336809Sdim  /// The thread pool on which to run the work.
124336809Sdim  ThreadPool &Scheduler;
125336809Sdim
126336809Sdim  /// State which indicates whether the queue currently is currently processing
127336809Sdim  /// any work.
128336809Sdim  bool IsTaskInFlight = false;
129336809Sdim
130336809Sdim  /// Mutex for synchronizing access to the Tasks array.
131336809Sdim  std::mutex QueueLock;
132336809Sdim
133336809Sdim  /// Tasks waiting for execution in the queue.
134336809Sdim  std::deque<std::function<void()>> Tasks;
135336809Sdim};
136336809Sdim} // namespace llvm
137336809Sdim
138336809Sdim#endif // LLVM_SUPPORT_TASK_QUEUE_H
139