[llvm] e846971 - Split the locking of the queue and the threads vector in the ThreadPool implementation

Mehdi Amini via llvm-commits llvm-commits at lists.llvm.org
Fri Dec 3 20:21:30 PST 2021


Author: Mehdi Amini
Date: 2021-12-04T04:10:24Z
New Revision: e846971811efbcf75a6cbedd9aebd58cecaf8188

URL: https://github.com/llvm/llvm-project/commit/e846971811efbcf75a6cbedd9aebd58cecaf8188
DIFF: https://github.com/llvm/llvm-project/commit/e846971811efbcf75a6cbedd9aebd58cecaf8188.diff

LOG: Split the locking of the queue and the threads vector in the ThreadPool implementation

This allows to release the QueueLock early and create Thread
independently of the queue processing.

Differential Revision: https://reviews.llvm.org/D115078

Added: 
    

Modified: 
    llvm/include/llvm/Support/ThreadPool.h
    llvm/lib/Support/ThreadPool.cpp

Removed: 
    


################################################################################
diff  --git a/llvm/include/llvm/Support/ThreadPool.h b/llvm/include/llvm/Support/ThreadPool.h
index e1b77a6bb554..aecff122d3cb 100644
--- a/llvm/include/llvm/Support/ThreadPool.h
+++ b/llvm/include/llvm/Support/ThreadPool.h
@@ -111,6 +111,7 @@ class ThreadPool {
     /// corresponding future.
     auto R = createTaskAndFuture(Task);
 
+    int requestedThreads;
     {
       // Lock the queue and push the new task
       std::unique_lock<std::mutex> LockGuard(QueueLock);
@@ -118,9 +119,10 @@ class ThreadPool {
       // Don't allow enqueueing after disabling the pool
       assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
       Tasks.push(std::move(R.first));
-      grow();
+      requestedThreads = ActiveThreads + Tasks.size();
     }
     QueueCondition.notify_one();
+    grow(requestedThreads);
     return R.second.share();
 
 #else // LLVM_ENABLE_THREADS Disabled
@@ -135,28 +137,21 @@ class ThreadPool {
   }
 
 #if LLVM_ENABLE_THREADS
-  // Maybe create a new thread and add it to Threads.
-  //
-  // Requirements:
-  //   * this->QueueLock should be owned by the calling thread prior to
-  //     calling this function. It will neither lock it nor unlock it.
-  //     Calling this function without owning QueueLock would result in data
-  //     races as this function reads Tasks and ActiveThreads.
-  //   * this->Tasks should be populated with any pending tasks. This function
-  //     uses Tasks.size() to determine whether it needs to create a new thread.
-  //   * this->ActiveThreads should be up to date as it is also used to
-  //     determine whether to create a new thread.
-  void grow();
+  // Grow to ensure that we have at least `requested` Threads, but do not go
+  // over MaxThreadCount.
+  void grow(int requested);
 #endif
 
   /// Threads in flight
   std::vector<llvm::thread> Threads;
+  /// Lock protecting access to the Threads vector.
+  mutable std::mutex ThreadsLock;
 
   /// Tasks waiting for execution in the pool.
   std::queue<std::function<void()>> Tasks;
 
   /// Locking and signaling for accessing the Tasks queue.
-  mutable std::mutex QueueLock;
+  std::mutex QueueLock;
   std::condition_variable QueueCondition;
 
   /// Signaling for job completion

diff  --git a/llvm/lib/Support/ThreadPool.cpp b/llvm/lib/Support/ThreadPool.cpp
index b917fda5f7ef..54ea84d4bd6d 100644
--- a/llvm/lib/Support/ThreadPool.cpp
+++ b/llvm/lib/Support/ThreadPool.cpp
@@ -23,49 +23,51 @@ using namespace llvm;
 ThreadPool::ThreadPool(ThreadPoolStrategy S)
     : Strategy(S), MaxThreadCount(S.compute_thread_count()) {}
 
-void ThreadPool::grow() {
+void ThreadPool::grow(int requested) {
+  std::unique_lock<std::mutex> LockGuard(ThreadsLock);
   if (Threads.size() >= MaxThreadCount)
     return; // Already hit the max thread pool size.
-  if (ActiveThreads + Tasks.size() <= Threads.size())
-    return; // We have enough threads for now.
-  int ThreadID = Threads.size();
-  Threads.emplace_back([this, ThreadID] {
-    Strategy.apply_thread_strategy(ThreadID);
-    while (true) {
-      std::function<void()> Task;
-      {
-        std::unique_lock<std::mutex> LockGuard(QueueLock);
-        // Wait for tasks to be pushed in the queue
-        QueueCondition.wait(LockGuard,
-                            [&] { return !EnableFlag || !Tasks.empty(); });
-        // Exit condition
-        if (!EnableFlag && Tasks.empty())
-          return;
-        // Yeah, we have a task, grab it and release the lock on the queue
-
-        // We first need to signal that we are active before popping the queue
-        // in order for wait() to properly detect that even if the queue is
-        // empty, there is still a task in flight.
-        ++ActiveThreads;
-        Task = std::move(Tasks.front());
-        Tasks.pop();
+  int newThreadCount = std::min<int>(requested, MaxThreadCount);
+  while (static_cast<int>(Threads.size()) < newThreadCount) {
+    int ThreadID = Threads.size();
+    Threads.emplace_back([this, ThreadID] {
+      Strategy.apply_thread_strategy(ThreadID);
+      while (true) {
+        std::function<void()> Task;
+        {
+          std::unique_lock<std::mutex> LockGuard(QueueLock);
+          // Wait for tasks to be pushed in the queue
+          QueueCondition.wait(LockGuard,
+                              [&] { return !EnableFlag || !Tasks.empty(); });
+          // Exit condition
+          if (!EnableFlag && Tasks.empty())
+            return;
+          // Yeah, we have a task, grab it and release the lock on the queue
+
+          // We first need to signal that we are active before popping the queue
+          // in order for wait() to properly detect that even if the queue is
+          // empty, there is still a task in flight.
+          ++ActiveThreads;
+          Task = std::move(Tasks.front());
+          Tasks.pop();
+        }
+        // Run the task we just grabbed
+        Task();
+
+        bool Notify;
+        {
+          // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait()
+          std::lock_guard<std::mutex> LockGuard(QueueLock);
+          --ActiveThreads;
+          Notify = workCompletedUnlocked();
+        }
+        // Notify task completion if this is the last active thread, in case
+        // someone waits on ThreadPool::wait().
+        if (Notify)
+          CompletionCondition.notify_all();
       }
-      // Run the task we just grabbed
-      Task();
-
-      bool Notify;
-      {
-        // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait()
-        std::lock_guard<std::mutex> LockGuard(QueueLock);
-        --ActiveThreads;
-        Notify = workCompletedUnlocked();
-      }
-      // Notify task completion if this is the last active thread, in case
-      // someone waits on ThreadPool::wait().
-      if (Notify)
-        CompletionCondition.notify_all();
-    }
-  });
+    });
+  }
 }
 
 void ThreadPool::wait() {
@@ -75,7 +77,7 @@ void ThreadPool::wait() {
 }
 
 bool ThreadPool::isWorkerThread() const {
-  std::unique_lock<std::mutex> LockGuard(QueueLock);
+  std::unique_lock<std::mutex> LockGuard(ThreadsLock);
   llvm::thread::id CurrentThreadId = llvm::this_thread::get_id();
   for (const llvm::thread &Thread : Threads)
     if (CurrentThreadId == Thread.get_id())
@@ -90,6 +92,7 @@ ThreadPool::~ThreadPool() {
     EnableFlag = false;
   }
   QueueCondition.notify_all();
+  std::unique_lock<std::mutex> LockGuard(ThreadsLock);
   for (auto &Worker : Threads)
     Worker.join();
 }


        


More information about the llvm-commits mailing list