1//===-- llvm/Support/TaskQueue.h - A TaskQueue implementation ---*- C++ -*-===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8//
9// This file defines a crude C++11 based task queue.
10//
11//===----------------------------------------------------------------------===//
12
13#ifndef LLVM_SUPPORT_TASK_QUEUE_H
14#define LLVM_SUPPORT_TASK_QUEUE_H
15
16#include "llvm/Config/llvm-config.h"
17#include "llvm/Support/ThreadPool.h"
18#include "llvm/Support/thread.h"
19
20#include <atomic>
21#include <cassert>
22#include <condition_variable>
23#include <deque>
24#include <functional>
25#include <future>
26#include <memory>
27#include <mutex>
28#include <utility>
29
30namespace llvm {
31/// TaskQueue executes serialized work on a user-defined Thread Pool.  It
32/// guarantees that if task B is enqueued after task A, task B begins after
33/// task A completes and there is no overlap between the two.
34class TaskQueue {
35  // Because we don't have init capture to use move-only local variables that
36  // are captured into a lambda, we create the promise inside an explicit
37  // callable struct. We want to do as much of the wrapping in the
38  // type-specialized domain (before type erasure) and then erase this into a
39  // std::function.
40  template <typename Callable> struct Task {
41    using ResultTy = typename std::result_of<Callable()>::type;
42    explicit Task(Callable C, TaskQueue &Parent)
43        : C(std::move(C)), P(std::make_shared<std::promise<ResultTy>>()),
44          Parent(&Parent) {}
45
46    template<typename T>
47    void invokeCallbackAndSetPromise(T*) {
48      P->set_value(C());
49    }
50
51    void invokeCallbackAndSetPromise(void*) {
52      C();
53      P->set_value();
54    }
55
56    void operator()() noexcept {
57      ResultTy *Dummy = nullptr;
58      invokeCallbackAndSetPromise(Dummy);
59      Parent->completeTask();
60    }
61
62    Callable C;
63    std::shared_ptr<std::promise<ResultTy>> P;
64    TaskQueue *Parent;
65  };
66
67public:
68  /// Construct a task queue with no work.
69  TaskQueue(ThreadPool &Scheduler) : Scheduler(Scheduler) { (void)Scheduler; }
70
71  /// Blocking destructor: the queue will wait for all work to complete.
72  ~TaskQueue() {
73    Scheduler.wait();
74    assert(Tasks.empty());
75  }
76
77  /// Asynchronous submission of a task to the queue. The returned future can be
78  /// used to wait for the task (and all previous tasks that have not yet
79  /// completed) to finish.
80  template <typename Callable>
81  std::future<typename std::result_of<Callable()>::type> async(Callable &&C) {
82#if !LLVM_ENABLE_THREADS
83    static_assert(false,
84                  "TaskQueue requires building with LLVM_ENABLE_THREADS!");
85#endif
86    Task<Callable> T{std::move(C), *this};
87    using ResultTy = typename std::result_of<Callable()>::type;
88    std::future<ResultTy> F = T.P->get_future();
89    {
90      std::lock_guard<std::mutex> Lock(QueueLock);
91      // If there's already a task in flight, just queue this one up.  If
92      // there is not a task in flight, bypass the queue and schedule this
93      // task immediately.
94      if (IsTaskInFlight)
95        Tasks.push_back(std::move(T));
96      else {
97        Scheduler.async(std::move(T));
98        IsTaskInFlight = true;
99      }
100    }
101    return std::move(F);
102  }
103
104private:
105  void completeTask() {
106    // We just completed a task.  If there are no more tasks in the queue,
107    // update IsTaskInFlight to false and stop doing work.  Otherwise
108    // schedule the next task (while not holding the lock).
109    std::function<void()> Continuation;
110    {
111      std::lock_guard<std::mutex> Lock(QueueLock);
112      if (Tasks.empty()) {
113        IsTaskInFlight = false;
114        return;
115      }
116
117      Continuation = std::move(Tasks.front());
118      Tasks.pop_front();
119    }
120    Scheduler.async(std::move(Continuation));
121  }
122
123  /// The thread pool on which to run the work.
124  ThreadPool &Scheduler;
125
126  /// State which indicates whether the queue currently is currently processing
127  /// any work.
128  bool IsTaskInFlight = false;
129
130  /// Mutex for synchronizing access to the Tasks array.
131  std::mutex QueueLock;
132
133  /// Tasks waiting for execution in the queue.
134  std::deque<std::function<void()>> Tasks;
135};
136} // namespace llvm
137
138#endif // LLVM_SUPPORT_TASK_QUEUE_H
139