1326949Sdim//===--------------------- TaskPool.cpp -------------------------*- C++ -*-===// 2326949Sdim// 3353358Sdim// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 4353358Sdim// See https://llvm.org/LICENSE.txt for license information. 5353358Sdim// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 6326949Sdim// 7326949Sdim//===----------------------------------------------------------------------===// 8326949Sdim 9326949Sdim#include "lldb/Host/TaskPool.h" 10326949Sdim#include "lldb/Host/ThreadLauncher.h" 11353358Sdim#include "lldb/Utility/Log.h" 12326949Sdim 13344779Sdim#include <cstdint> 14344779Sdim#include <queue> 15344779Sdim#include <thread> 16326949Sdim 17326949Sdimnamespace lldb_private { 18326949Sdim 19326949Sdimnamespace { 20326949Sdimclass TaskPoolImpl { 21326949Sdimpublic: 22326949Sdim static TaskPoolImpl &GetInstance(); 23326949Sdim 24326949Sdim void AddTask(std::function<void()> &&task_fn); 25326949Sdim 26326949Sdimprivate: 27326949Sdim TaskPoolImpl(); 28326949Sdim 29326949Sdim static lldb::thread_result_t WorkerPtr(void *pool); 30326949Sdim 31326949Sdim static void Worker(TaskPoolImpl *pool); 32326949Sdim 33326949Sdim std::queue<std::function<void()>> m_tasks; 34326949Sdim std::mutex m_tasks_mutex; 35326949Sdim uint32_t m_thread_count; 36326949Sdim}; 37326949Sdim 38326949Sdim} // end of anonymous namespace 39326949Sdim 40326949SdimTaskPoolImpl &TaskPoolImpl::GetInstance() { 41326949Sdim static TaskPoolImpl g_task_pool_impl; 42326949Sdim return g_task_pool_impl; 43326949Sdim} 44326949Sdim 45326949Sdimvoid TaskPool::AddTaskImpl(std::function<void()> &&task_fn) { 46326949Sdim TaskPoolImpl::GetInstance().AddTask(std::move(task_fn)); 47326949Sdim} 48326949Sdim 49326949SdimTaskPoolImpl::TaskPoolImpl() : m_thread_count(0) {} 50326949Sdim 51326949Sdimunsigned GetHardwareConcurrencyHint() { 52341825Sdim // std::thread::hardware_concurrency may return 0 if the value is not well 53341825Sdim // defined or not computable. 54326949Sdim static const unsigned g_hardware_concurrency = 55326949Sdim std::max(1u, std::thread::hardware_concurrency()); 56326949Sdim return g_hardware_concurrency; 57326949Sdim} 58326949Sdim 59326949Sdimvoid TaskPoolImpl::AddTask(std::function<void()> &&task_fn) { 60326949Sdim const size_t min_stack_size = 8 * 1024 * 1024; 61326949Sdim 62326949Sdim std::unique_lock<std::mutex> lock(m_tasks_mutex); 63326949Sdim m_tasks.emplace(std::move(task_fn)); 64326949Sdim if (m_thread_count < GetHardwareConcurrencyHint()) { 65326949Sdim m_thread_count++; 66326949Sdim // Note that this detach call needs to happen with the m_tasks_mutex held. 67341825Sdim // This prevents the thread from exiting prematurely and triggering a linux 68341825Sdim // libc bug (https://sourceware.org/bugzilla/show_bug.cgi?id=19951). 69353358Sdim llvm::Expected<HostThread> host_thread = 70353358Sdim lldb_private::ThreadLauncher::LaunchThread( 71353358Sdim "task-pool.worker", WorkerPtr, this, min_stack_size); 72353358Sdim if (host_thread) { 73353358Sdim host_thread->Release(); 74353358Sdim } else { 75353358Sdim LLDB_LOG(lldb_private::GetLogIfAllCategoriesSet(LIBLLDB_LOG_HOST), 76353358Sdim "failed to launch host thread: {}", 77353358Sdim llvm::toString(host_thread.takeError())); 78353358Sdim } 79326949Sdim } 80326949Sdim} 81326949Sdim 82326949Sdimlldb::thread_result_t TaskPoolImpl::WorkerPtr(void *pool) { 83326949Sdim Worker((TaskPoolImpl *)pool); 84353358Sdim return {}; 85326949Sdim} 86326949Sdim 87326949Sdimvoid TaskPoolImpl::Worker(TaskPoolImpl *pool) { 88326949Sdim while (true) { 89326949Sdim std::unique_lock<std::mutex> lock(pool->m_tasks_mutex); 90326949Sdim if (pool->m_tasks.empty()) { 91326949Sdim pool->m_thread_count--; 92326949Sdim break; 93326949Sdim } 94326949Sdim 95326949Sdim std::function<void()> f = std::move(pool->m_tasks.front()); 96326949Sdim pool->m_tasks.pop(); 97326949Sdim lock.unlock(); 98326949Sdim 99326949Sdim f(); 100326949Sdim } 101326949Sdim} 102326949Sdim 103326949Sdimvoid TaskMapOverInt(size_t begin, size_t end, 104326949Sdim const llvm::function_ref<void(size_t)> &func) { 105326949Sdim const size_t num_workers = std::min<size_t>(end, GetHardwareConcurrencyHint()); 106326949Sdim std::atomic<size_t> idx{begin}; 107326949Sdim 108326949Sdim auto wrapper = [&idx, end, &func]() { 109326949Sdim while (true) { 110326949Sdim size_t i = idx.fetch_add(1); 111326949Sdim if (i >= end) 112326949Sdim break; 113326949Sdim func(i); 114326949Sdim } 115326949Sdim }; 116326949Sdim 117326949Sdim std::vector<std::future<void>> futures; 118326949Sdim futures.reserve(num_workers); 119326949Sdim for (size_t i = 0; i < num_workers; i++) 120326949Sdim futures.push_back(TaskPool::AddTask(wrapper)); 121326949Sdim for (size_t i = 0; i < num_workers; i++) 122326949Sdim futures[i].wait(); 123326949Sdim} 124326949Sdim 125326949Sdim} // namespace lldb_private 126326949Sdim 127