1//===-- llvm/Support/TaskQueue.h - A TaskQueue implementation ---*- C++ -*-===// 2// 3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 4// See https://llvm.org/LICENSE.txt for license information. 5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 6// 7//===----------------------------------------------------------------------===// 8// 9// This file defines a crude C++11 based task queue. 10// 11//===----------------------------------------------------------------------===// 12 13#ifndef LLVM_SUPPORT_TASK_QUEUE_H 14#define LLVM_SUPPORT_TASK_QUEUE_H 15 16#include "llvm/Config/llvm-config.h" 17#include "llvm/Support/ThreadPool.h" 18#include "llvm/Support/thread.h" 19 20#include <atomic> 21#include <cassert> 22#include <condition_variable> 23#include <deque> 24#include <functional> 25#include <future> 26#include <memory> 27#include <mutex> 28#include <utility> 29 30namespace llvm { 31/// TaskQueue executes serialized work on a user-defined Thread Pool. It 32/// guarantees that if task B is enqueued after task A, task B begins after 33/// task A completes and there is no overlap between the two. 34class TaskQueue { 35 // Because we don't have init capture to use move-only local variables that 36 // are captured into a lambda, we create the promise inside an explicit 37 // callable struct. We want to do as much of the wrapping in the 38 // type-specialized domain (before type erasure) and then erase this into a 39 // std::function. 40 template <typename Callable> struct Task { 41 using ResultTy = typename std::result_of<Callable()>::type; 42 explicit Task(Callable C, TaskQueue &Parent) 43 : C(std::move(C)), P(std::make_shared<std::promise<ResultTy>>()), 44 Parent(&Parent) {} 45 46 template<typename T> 47 void invokeCallbackAndSetPromise(T*) { 48 P->set_value(C()); 49 } 50 51 void invokeCallbackAndSetPromise(void*) { 52 C(); 53 P->set_value(); 54 } 55 56 void operator()() noexcept { 57 ResultTy *Dummy = nullptr; 58 invokeCallbackAndSetPromise(Dummy); 59 Parent->completeTask(); 60 } 61 62 Callable C; 63 std::shared_ptr<std::promise<ResultTy>> P; 64 TaskQueue *Parent; 65 }; 66 67public: 68 /// Construct a task queue with no work. 69 TaskQueue(ThreadPool &Scheduler) : Scheduler(Scheduler) { (void)Scheduler; } 70 71 /// Blocking destructor: the queue will wait for all work to complete. 72 ~TaskQueue() { 73 Scheduler.wait(); 74 assert(Tasks.empty()); 75 } 76 77 /// Asynchronous submission of a task to the queue. The returned future can be 78 /// used to wait for the task (and all previous tasks that have not yet 79 /// completed) to finish. 80 template <typename Callable> 81 std::future<typename std::result_of<Callable()>::type> async(Callable &&C) { 82#if !LLVM_ENABLE_THREADS 83 static_assert(false, 84 "TaskQueue requires building with LLVM_ENABLE_THREADS!"); 85#endif 86 Task<Callable> T{std::move(C), *this}; 87 using ResultTy = typename std::result_of<Callable()>::type; 88 std::future<ResultTy> F = T.P->get_future(); 89 { 90 std::lock_guard<std::mutex> Lock(QueueLock); 91 // If there's already a task in flight, just queue this one up. If 92 // there is not a task in flight, bypass the queue and schedule this 93 // task immediately. 94 if (IsTaskInFlight) 95 Tasks.push_back(std::move(T)); 96 else { 97 Scheduler.async(std::move(T)); 98 IsTaskInFlight = true; 99 } 100 } 101 return std::move(F); 102 } 103 104private: 105 void completeTask() { 106 // We just completed a task. If there are no more tasks in the queue, 107 // update IsTaskInFlight to false and stop doing work. Otherwise 108 // schedule the next task (while not holding the lock). 109 std::function<void()> Continuation; 110 { 111 std::lock_guard<std::mutex> Lock(QueueLock); 112 if (Tasks.empty()) { 113 IsTaskInFlight = false; 114 return; 115 } 116 117 Continuation = std::move(Tasks.front()); 118 Tasks.pop_front(); 119 } 120 Scheduler.async(std::move(Continuation)); 121 } 122 123 /// The thread pool on which to run the work. 124 ThreadPool &Scheduler; 125 126 /// State which indicates whether the queue currently is currently processing 127 /// any work. 128 bool IsTaskInFlight = false; 129 130 /// Mutex for synchronizing access to the Tasks array. 131 std::mutex QueueLock; 132 133 /// Tasks waiting for execution in the queue. 134 std::deque<std::function<void()>> Tasks; 135}; 136} // namespace llvm 137 138#endif // LLVM_SUPPORT_TASK_QUEUE_H 139