1336809Sdim//===-- llvm/Support/TaskQueue.h - A TaskQueue implementation ---*- C++ -*-===// 2336809Sdim// 3353358Sdim// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 4353358Sdim// See https://llvm.org/LICENSE.txt for license information. 5353358Sdim// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 6336809Sdim// 7336809Sdim//===----------------------------------------------------------------------===// 8336809Sdim// 9336809Sdim// This file defines a crude C++11 based task queue. 10336809Sdim// 11336809Sdim//===----------------------------------------------------------------------===// 12336809Sdim 13336809Sdim#ifndef LLVM_SUPPORT_TASK_QUEUE_H 14336809Sdim#define LLVM_SUPPORT_TASK_QUEUE_H 15336809Sdim 16336809Sdim#include "llvm/Config/llvm-config.h" 17336809Sdim#include "llvm/Support/ThreadPool.h" 18336809Sdim#include "llvm/Support/thread.h" 19336809Sdim 20336809Sdim#include <atomic> 21336809Sdim#include <cassert> 22336809Sdim#include <condition_variable> 23336809Sdim#include <deque> 24336809Sdim#include <functional> 25336809Sdim#include <future> 26336809Sdim#include <memory> 27336809Sdim#include <mutex> 28336809Sdim#include <utility> 29336809Sdim 30336809Sdimnamespace llvm { 31336809Sdim/// TaskQueue executes serialized work on a user-defined Thread Pool. It 32336809Sdim/// guarantees that if task B is enqueued after task A, task B begins after 33336809Sdim/// task A completes and there is no overlap between the two. 34336809Sdimclass TaskQueue { 35336809Sdim // Because we don't have init capture to use move-only local variables that 36336809Sdim // are captured into a lambda, we create the promise inside an explicit 37336809Sdim // callable struct. We want to do as much of the wrapping in the 38336809Sdim // type-specialized domain (before type erasure) and then erase this into a 39336809Sdim // std::function. 40336809Sdim template <typename Callable> struct Task { 41336809Sdim using ResultTy = typename std::result_of<Callable()>::type; 42336809Sdim explicit Task(Callable C, TaskQueue &Parent) 43336809Sdim : C(std::move(C)), P(std::make_shared<std::promise<ResultTy>>()), 44336809Sdim Parent(&Parent) {} 45336809Sdim 46336809Sdim template<typename T> 47336809Sdim void invokeCallbackAndSetPromise(T*) { 48336809Sdim P->set_value(C()); 49336809Sdim } 50336809Sdim 51336809Sdim void invokeCallbackAndSetPromise(void*) { 52336809Sdim C(); 53336809Sdim P->set_value(); 54336809Sdim } 55336809Sdim 56336809Sdim void operator()() noexcept { 57336809Sdim ResultTy *Dummy = nullptr; 58336809Sdim invokeCallbackAndSetPromise(Dummy); 59336809Sdim Parent->completeTask(); 60336809Sdim } 61336809Sdim 62336809Sdim Callable C; 63336809Sdim std::shared_ptr<std::promise<ResultTy>> P; 64336809Sdim TaskQueue *Parent; 65336809Sdim }; 66336809Sdim 67336809Sdimpublic: 68336809Sdim /// Construct a task queue with no work. 69336809Sdim TaskQueue(ThreadPool &Scheduler) : Scheduler(Scheduler) { (void)Scheduler; } 70336809Sdim 71336809Sdim /// Blocking destructor: the queue will wait for all work to complete. 72336809Sdim ~TaskQueue() { 73336809Sdim Scheduler.wait(); 74336809Sdim assert(Tasks.empty()); 75336809Sdim } 76336809Sdim 77336809Sdim /// Asynchronous submission of a task to the queue. The returned future can be 78336809Sdim /// used to wait for the task (and all previous tasks that have not yet 79336809Sdim /// completed) to finish. 80336809Sdim template <typename Callable> 81336809Sdim std::future<typename std::result_of<Callable()>::type> async(Callable &&C) { 82336809Sdim#if !LLVM_ENABLE_THREADS 83336809Sdim static_assert(false, 84336809Sdim "TaskQueue requires building with LLVM_ENABLE_THREADS!"); 85336809Sdim#endif 86336809Sdim Task<Callable> T{std::move(C), *this}; 87336809Sdim using ResultTy = typename std::result_of<Callable()>::type; 88336809Sdim std::future<ResultTy> F = T.P->get_future(); 89336809Sdim { 90336809Sdim std::lock_guard<std::mutex> Lock(QueueLock); 91336809Sdim // If there's already a task in flight, just queue this one up. If 92336809Sdim // there is not a task in flight, bypass the queue and schedule this 93336809Sdim // task immediately. 94336809Sdim if (IsTaskInFlight) 95336809Sdim Tasks.push_back(std::move(T)); 96336809Sdim else { 97336809Sdim Scheduler.async(std::move(T)); 98336809Sdim IsTaskInFlight = true; 99336809Sdim } 100336809Sdim } 101336809Sdim return std::move(F); 102336809Sdim } 103336809Sdim 104336809Sdimprivate: 105336809Sdim void completeTask() { 106336809Sdim // We just completed a task. If there are no more tasks in the queue, 107336809Sdim // update IsTaskInFlight to false and stop doing work. Otherwise 108336809Sdim // schedule the next task (while not holding the lock). 109336809Sdim std::function<void()> Continuation; 110336809Sdim { 111336809Sdim std::lock_guard<std::mutex> Lock(QueueLock); 112336809Sdim if (Tasks.empty()) { 113336809Sdim IsTaskInFlight = false; 114336809Sdim return; 115336809Sdim } 116336809Sdim 117336809Sdim Continuation = std::move(Tasks.front()); 118336809Sdim Tasks.pop_front(); 119336809Sdim } 120336809Sdim Scheduler.async(std::move(Continuation)); 121336809Sdim } 122336809Sdim 123336809Sdim /// The thread pool on which to run the work. 124336809Sdim ThreadPool &Scheduler; 125336809Sdim 126336809Sdim /// State which indicates whether the queue currently is currently processing 127336809Sdim /// any work. 128336809Sdim bool IsTaskInFlight = false; 129336809Sdim 130336809Sdim /// Mutex for synchronizing access to the Tasks array. 131336809Sdim std::mutex QueueLock; 132336809Sdim 133336809Sdim /// Tasks waiting for execution in the queue. 134336809Sdim std::deque<std::function<void()>> Tasks; 135336809Sdim}; 136336809Sdim} // namespace llvm 137336809Sdim 138336809Sdim#endif // LLVM_SUPPORT_TASK_QUEUE_H 139