[llvm] 728b982 - ThreadPool: grow the pool only as needed
Mehdi Amini via llvm-commits
llvm-commits at lists.llvm.org
Fri Dec 3 13:47:57 PST 2021
Author: Benoit Jacob
Date: 2021-12-03T21:40:36Z
New Revision: 728b982bb2aec5a5e9c887a7b0181ee360b27b54
URL: https://github.com/llvm/llvm-project/commit/728b982bb2aec5a5e9c887a7b0181ee360b27b54
DIFF: https://github.com/llvm/llvm-project/commit/728b982bb2aec5a5e9c887a7b0181ee360b27b54.diff
LOG: ThreadPool: grow the pool only as needed
On my 96-core cloudtop 'machine', it seems unnecessary to always start
96 threads upfront... particularly as the ThreadPool is created even
with -mlir-disable-threading. Things like the resuling spew in GDB and
the obfuscated output of `(gdb) info threads` are my motivation here,
but it probably also doesn't hurt for at least some efficiency metrics to
avoid creating many threads upfront.
Reviewed By: mehdi_amini
Differential Revision: https://reviews.llvm.org/D115019
Added:
Modified:
llvm/include/llvm/Support/ThreadPool.h
llvm/lib/Support/ThreadPool.cpp
Removed:
################################################################################
diff --git a/llvm/include/llvm/Support/ThreadPool.h b/llvm/include/llvm/Support/ThreadPool.h
index 8d30e8e92755a..b0f4c8e04ca20 100644
--- a/llvm/include/llvm/Support/ThreadPool.h
+++ b/llvm/include/llvm/Support/ThreadPool.h
@@ -40,7 +40,8 @@ class ThreadPool {
/// execution resources (threads, cores, CPUs)
/// Defaults to using the maximum execution resources in the system, but
/// accounting for the affinity mask.
- ThreadPool(ThreadPoolStrategy S = hardware_concurrency());
+ ThreadPool(ThreadPoolStrategy S = hardware_concurrency())
+ : Strategy(S), MaxThreadCount(S.compute_thread_count()) {}
/// Blocking destructor: the pool will wait for all the threads to complete.
~ThreadPool();
@@ -65,7 +66,10 @@ class ThreadPool {
/// It is an error to try to add new tasks while blocking on this call.
void wait();
- unsigned getThreadCount() const { return ThreadCount; }
+ // TODO: misleading legacy name warning!
+ // Returns the maximum number of worker threads in the pool, not the current
+ // number of threads!
+ unsigned getThreadCount() const { return MaxThreadCount; }
/// Returns true if the current thread is a worker thread of this thread pool.
bool isWorkerThread() const;
@@ -115,6 +119,7 @@ class ThreadPool {
// Don't allow enqueueing after disabling the pool
assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
Tasks.push(std::move(R.first));
+ grow();
}
QueueCondition.notify_one();
return R.second.share();
@@ -130,6 +135,21 @@ class ThreadPool {
#endif
}
+#if LLVM_ENABLE_THREADS
+ // Maybe create a new thread and add it to Threads.
+ //
+ // Requirements:
+ // * this->QueueLock should be owned by the calling thread prior to
+ // calling this function. It will neither lock it nor unlock it.
+ // Calling this function without owning QueueLock would result in data
+ // races as this function reads Tasks and ActiveThreads.
+ // * this->Tasks should be populated with any pending tasks. This function
+ // uses Tasks.size() to determine whether it needs to create a new thread.
+ // * this->ActiveThreads should be up to date as it is also used to
+ // determine whether to create a new thread.
+ void grow();
+#endif
+
/// Threads in flight
std::vector<llvm::thread> Threads;
@@ -137,7 +157,7 @@ class ThreadPool {
std::queue<std::function<void()>> Tasks;
/// Locking and signaling for accessing the Tasks queue.
- std::mutex QueueLock;
+ mutable std::mutex QueueLock;
std::condition_variable QueueCondition;
/// Signaling for job completion
@@ -151,7 +171,10 @@ class ThreadPool {
bool EnableFlag = true;
#endif
- unsigned ThreadCount;
+ const ThreadPoolStrategy Strategy;
+
+ /// Maximum number of threads to potentially grow this pool to.
+ const unsigned MaxThreadCount;
};
}
diff --git a/llvm/lib/Support/ThreadPool.cpp b/llvm/lib/Support/ThreadPool.cpp
index c11e16d3cf98c..734e8f8e274ff 100644
--- a/llvm/lib/Support/ThreadPool.cpp
+++ b/llvm/lib/Support/ThreadPool.cpp
@@ -20,50 +20,49 @@ using namespace llvm;
#if LLVM_ENABLE_THREADS
-ThreadPool::ThreadPool(ThreadPoolStrategy S)
- : ThreadCount(S.compute_thread_count()) {
- // Create ThreadCount threads that will loop forever, wait on QueueCondition
- // for tasks to be queued or the Pool to be destroyed.
- Threads.reserve(ThreadCount);
- for (unsigned ThreadID = 0; ThreadID < ThreadCount; ++ThreadID) {
- Threads.emplace_back([S, ThreadID, this] {
- S.apply_thread_strategy(ThreadID);
- while (true) {
- std::function<void()> Task;
- {
- std::unique_lock<std::mutex> LockGuard(QueueLock);
- // Wait for tasks to be pushed in the queue
- QueueCondition.wait(LockGuard,
- [&] { return !EnableFlag || !Tasks.empty(); });
- // Exit condition
- if (!EnableFlag && Tasks.empty())
- return;
- // Yeah, we have a task, grab it and release the lock on the queue
+void ThreadPool::grow() {
+ if (Threads.size() >= MaxThreadCount)
+ return; // Already hit the max thread pool size.
+ if (ActiveThreads + Tasks.size() <= Threads.size())
+ return; // We have enough threads for now.
+ int ThreadID = Threads.size();
+ Threads.emplace_back([this, ThreadID] {
+ Strategy.apply_thread_strategy(ThreadID);
+ while (true) {
+ std::function<void()> Task;
+ {
+ std::unique_lock<std::mutex> LockGuard(QueueLock);
+ // Wait for tasks to be pushed in the queue
+ QueueCondition.wait(LockGuard,
+ [&] { return !EnableFlag || !Tasks.empty(); });
+ // Exit condition
+ if (!EnableFlag && Tasks.empty())
+ return;
+ // Yeah, we have a task, grab it and release the lock on the queue
- // We first need to signal that we are active before popping the queue
- // in order for wait() to properly detect that even if the queue is
- // empty, there is still a task in flight.
- ++ActiveThreads;
- Task = std::move(Tasks.front());
- Tasks.pop();
- }
- // Run the task we just grabbed
- Task();
+ // We first need to signal that we are active before popping the queue
+ // in order for wait() to properly detect that even if the queue is
+ // empty, there is still a task in flight.
+ ++ActiveThreads;
+ Task = std::move(Tasks.front());
+ Tasks.pop();
+ }
+ // Run the task we just grabbed
+ Task();
- bool Notify;
- {
- // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait()
- std::lock_guard<std::mutex> LockGuard(QueueLock);
- --ActiveThreads;
- Notify = workCompletedUnlocked();
- }
- // Notify task completion if this is the last active thread, in case
- // someone waits on ThreadPool::wait().
- if (Notify)
- CompletionCondition.notify_all();
+ bool Notify;
+ {
+ // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait()
+ std::lock_guard<std::mutex> LockGuard(QueueLock);
+ --ActiveThreads;
+ Notify = workCompletedUnlocked();
}
- });
- }
+ // Notify task completion if this is the last active thread, in case
+ // someone waits on ThreadPool::wait().
+ if (Notify)
+ CompletionCondition.notify_all();
+ }
+ });
}
void ThreadPool::wait() {
@@ -73,6 +72,7 @@ void ThreadPool::wait() {
}
bool ThreadPool::isWorkerThread() const {
+ std::unique_lock<std::mutex> LockGuard(QueueLock);
llvm::thread::id CurrentThreadId = llvm::this_thread::get_id();
for (const llvm::thread &Thread : Threads)
if (CurrentThreadId == Thread.get_id())
More information about the llvm-commits
mailing list