1292915Sdim//==-- llvm/Support/ThreadPool.cpp - A ThreadPool implementation -*- C++ -*-==// 2292915Sdim// 3353358Sdim// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 4353358Sdim// See https://llvm.org/LICENSE.txt for license information. 5353358Sdim// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 6292915Sdim// 7292915Sdim//===----------------------------------------------------------------------===// 8292915Sdim// 9292915Sdim// This file implements a crude C++11 based thread pool. 10292915Sdim// 11292915Sdim//===----------------------------------------------------------------------===// 12292915Sdim 13292915Sdim#include "llvm/Support/ThreadPool.h" 14292915Sdim 15292915Sdim#include "llvm/Config/llvm-config.h" 16327952Sdim#include "llvm/Support/Threading.h" 17292915Sdim#include "llvm/Support/raw_ostream.h" 18292915Sdim 19292915Sdimusing namespace llvm; 20292915Sdim 21292915Sdim#if LLVM_ENABLE_THREADS 22292915Sdim 23327952Sdim// Default to hardware_concurrency 24327952SdimThreadPool::ThreadPool() : ThreadPool(hardware_concurrency()) {} 25292915Sdim 26292915SdimThreadPool::ThreadPool(unsigned ThreadCount) 27292915Sdim : ActiveThreads(0), EnableFlag(true) { 28292915Sdim // Create ThreadCount threads that will loop forever, wait on QueueCondition 29292915Sdim // for tasks to be queued or the Pool to be destroyed. 30292915Sdim Threads.reserve(ThreadCount); 31292915Sdim for (unsigned ThreadID = 0; ThreadID < ThreadCount; ++ThreadID) { 32292915Sdim Threads.emplace_back([&] { 33292915Sdim while (true) { 34292915Sdim PackagedTaskTy Task; 35292915Sdim { 36292915Sdim std::unique_lock<std::mutex> LockGuard(QueueLock); 37292915Sdim // Wait for tasks to be pushed in the queue 38292915Sdim QueueCondition.wait(LockGuard, 39292915Sdim [&] { return !EnableFlag || !Tasks.empty(); }); 40292915Sdim // Exit condition 41292915Sdim if (!EnableFlag && Tasks.empty()) 42292915Sdim return; 43292915Sdim // Yeah, we have a task, grab it and release the lock on the queue 44292915Sdim 45292915Sdim // We first need to signal that we are active before popping the queue 46292915Sdim // in order for wait() to properly detect that even if the queue is 47292915Sdim // empty, there is still a task in flight. 48292915Sdim { 49327952Sdim std::unique_lock<std::mutex> LockGuard(CompletionLock); 50292915Sdim ++ActiveThreads; 51292915Sdim } 52292915Sdim Task = std::move(Tasks.front()); 53292915Sdim Tasks.pop(); 54292915Sdim } 55292915Sdim // Run the task we just grabbed 56292915Sdim Task(); 57292915Sdim 58292915Sdim { 59292915Sdim // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait() 60292915Sdim std::unique_lock<std::mutex> LockGuard(CompletionLock); 61292915Sdim --ActiveThreads; 62292915Sdim } 63292915Sdim 64292915Sdim // Notify task completion, in case someone waits on ThreadPool::wait() 65292915Sdim CompletionCondition.notify_all(); 66292915Sdim } 67292915Sdim }); 68292915Sdim } 69292915Sdim} 70292915Sdim 71292915Sdimvoid ThreadPool::wait() { 72292915Sdim // Wait for all threads to complete and the queue to be empty 73292915Sdim std::unique_lock<std::mutex> LockGuard(CompletionLock); 74309124Sdim // The order of the checks for ActiveThreads and Tasks.empty() matters because 75309124Sdim // any active threads might be modifying the Tasks queue, and this would be a 76309124Sdim // race. 77292915Sdim CompletionCondition.wait(LockGuard, 78309124Sdim [&] { return !ActiveThreads && Tasks.empty(); }); 79292915Sdim} 80292915Sdim 81321369Sdimstd::shared_future<void> ThreadPool::asyncImpl(TaskTy Task) { 82292915Sdim /// Wrap the Task in a packaged_task to return a future object. 83292915Sdim PackagedTaskTy PackagedTask(std::move(Task)); 84292915Sdim auto Future = PackagedTask.get_future(); 85292915Sdim { 86292915Sdim // Lock the queue and push the new task 87292915Sdim std::unique_lock<std::mutex> LockGuard(QueueLock); 88292915Sdim 89292915Sdim // Don't allow enqueueing after disabling the pool 90292915Sdim assert(EnableFlag && "Queuing a thread during ThreadPool destruction"); 91292915Sdim 92292915Sdim Tasks.push(std::move(PackagedTask)); 93292915Sdim } 94292915Sdim QueueCondition.notify_one(); 95292915Sdim return Future.share(); 96292915Sdim} 97292915Sdim 98292915Sdim// The destructor joins all threads, waiting for completion. 99292915SdimThreadPool::~ThreadPool() { 100292915Sdim { 101292915Sdim std::unique_lock<std::mutex> LockGuard(QueueLock); 102292915Sdim EnableFlag = false; 103292915Sdim } 104292915Sdim QueueCondition.notify_all(); 105292915Sdim for (auto &Worker : Threads) 106292915Sdim Worker.join(); 107292915Sdim} 108292915Sdim 109292915Sdim#else // LLVM_ENABLE_THREADS Disabled 110292915Sdim 111292915SdimThreadPool::ThreadPool() : ThreadPool(0) {} 112292915Sdim 113292915Sdim// No threads are launched, issue a warning if ThreadCount is not 0 114292915SdimThreadPool::ThreadPool(unsigned ThreadCount) 115292915Sdim : ActiveThreads(0) { 116292915Sdim if (ThreadCount) { 117292915Sdim errs() << "Warning: request a ThreadPool with " << ThreadCount 118292915Sdim << " threads, but LLVM_ENABLE_THREADS has been turned off\n"; 119292915Sdim } 120292915Sdim} 121292915Sdim 122292915Sdimvoid ThreadPool::wait() { 123292915Sdim // Sequential implementation running the tasks 124292915Sdim while (!Tasks.empty()) { 125292915Sdim auto Task = std::move(Tasks.front()); 126292915Sdim Tasks.pop(); 127321369Sdim Task(); 128292915Sdim } 129292915Sdim} 130292915Sdim 131321369Sdimstd::shared_future<void> ThreadPool::asyncImpl(TaskTy Task) { 132292915Sdim // Get a Future with launch::deferred execution using std::async 133292915Sdim auto Future = std::async(std::launch::deferred, std::move(Task)).share(); 134292915Sdim // Wrap the future so that both ThreadPool::wait() can operate and the 135292915Sdim // returned future can be sync'ed on. 136292915Sdim PackagedTaskTy PackagedTask([Future]() { Future.get(); }); 137292915Sdim Tasks.push(std::move(PackagedTask)); 138292915Sdim return Future; 139292915Sdim} 140292915Sdim 141292915SdimThreadPool::~ThreadPool() { 142292915Sdim wait(); 143292915Sdim} 144292915Sdim 145292915Sdim#endif 146