[llvm] 8ef5710 - [ThreadPool] add ability to group tasks into separate groups

Luboš Luňák via llvm-commits llvm-commits at lists.llvm.org
Tue May 3 21:19:38 PDT 2022


Author: Luboš Luňák
Date: 2022-05-04T06:16:55+02:00
New Revision: 8ef5710e6303904dc9a99019bf9b1a8353397f0c

URL: https://github.com/llvm/llvm-project/commit/8ef5710e6303904dc9a99019bf9b1a8353397f0c
DIFF: https://github.com/llvm/llvm-project/commit/8ef5710e6303904dc9a99019bf9b1a8353397f0c.diff

LOG: [ThreadPool] add ability to group tasks into separate groups

This is needed for parallelizing of loading modules symbols in LLDB
(D122975). Currently LLDB can parallelize indexing symbols
when loading a module, but modules are loaded sequentially. If LLDB
index cache is enabled, this means that the cache loading is not
parallelized, even though it could. However doing that creates
a threadpool-within-threadpool situation, so the number of threads
would not be properly limited.

This change adds ThreadPoolTaskGroup as a simple type that can be
used with ThreadPool calls to put tasks into groups that can be
independently waited for (even recursively from within a task)
but still run in the same thread pool.

Differential Revision: https://reviews.llvm.org/D123225

Added: 
    

Modified: 
    llvm/include/llvm/Support/ThreadPool.h
    llvm/lib/Support/ThreadPool.cpp
    llvm/tools/llvm-profdata/llvm-profdata.cpp
    llvm/unittests/Support/ThreadPool.cpp

Removed: 
    


################################################################################
diff  --git a/llvm/include/llvm/Support/ThreadPool.h b/llvm/include/llvm/Support/ThreadPool.h
index 868dd2819f836..5e67a312d5c7b 100644
--- a/llvm/include/llvm/Support/ThreadPool.h
+++ b/llvm/include/llvm/Support/ThreadPool.h
@@ -13,26 +13,42 @@
 #ifndef LLVM_SUPPORT_THREADPOOL_H
 #define LLVM_SUPPORT_THREADPOOL_H
 
+#include "llvm/ADT/DenseMap.h"
 #include "llvm/Config/llvm-config.h"
+#include "llvm/Support/RWMutex.h"
 #include "llvm/Support/Threading.h"
 #include "llvm/Support/thread.h"
 
 #include <future>
 
 #include <condition_variable>
+#include <deque>
 #include <functional>
 #include <memory>
 #include <mutex>
-#include <queue>
 #include <utility>
 
 namespace llvm {
 
+class ThreadPoolTaskGroup;
+
 /// A ThreadPool for asynchronous parallel execution on a defined number of
 /// threads.
 ///
 /// The pool keeps a vector of threads alive, waiting on a condition variable
 /// for some work to become available.
+///
+/// It is possible to reuse one thread pool for 
diff erent groups of tasks
+/// by grouping tasks using ThreadPoolTaskGroup. All tasks are processed using
+/// the same queue, but it is possible to wait only for a specific group of
+/// tasks to finish.
+///
+/// It is also possible for worker threads to submit new tasks and wait for
+/// them. Note that this may result in a deadlock in cases such as when a task
+/// (directly or indirectly) tries to wait for its own completion, or when all
+/// available threads are used up by tasks waiting for a task that has no thread
+/// left to run on (this includes waiting on the returned future). It should be
+/// generally safe to wait() for a group as long as groups do not form a cycle.
 class ThreadPool {
 public:
   /// Construct a pool using the hardware strategy \p S for mapping hardware
@@ -47,23 +63,47 @@ class ThreadPool {
   /// Asynchronous submission of a task to the pool. The returned future can be
   /// used to wait for the task to finish and is *non-blocking* on destruction.
   template <typename Function, typename... Args>
-  inline auto async(Function &&F, Args &&...ArgList) {
+  auto async(Function &&F, Args &&...ArgList) {
     auto Task =
         std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
     return async(std::move(Task));
   }
 
+  /// Overload, task will be in the given task group.
+  template <typename Function, typename... Args>
+  auto async(ThreadPoolTaskGroup &Group, Function &&F, Args &&...ArgList) {
+    auto Task =
+        std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
+    return async(Group, std::move(Task));
+  }
+
   /// Asynchronous submission of a task to the pool. The returned future can be
   /// used to wait for the task to finish and is *non-blocking* on destruction.
   template <typename Func>
   auto async(Func &&F) -> std::shared_future<decltype(F())> {
-    return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)));
+    return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)),
+                     nullptr);
+  }
+
+  template <typename Func>
+  auto async(ThreadPoolTaskGroup &Group, Func &&F)
+      -> std::shared_future<decltype(F())> {
+    return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)),
+                     &Group);
   }
 
   /// Blocking wait for all the threads to complete and the queue to be empty.
   /// It is an error to try to add new tasks while blocking on this call.
+  /// Calling wait() from a task would deadlock waiting for itself.
   void wait();
 
+  /// Blocking wait for only all the threads in the given group to complete.
+  /// It is possible to wait even inside a task, but waiting (directly or
+  /// indirectly) on itself will deadlock. If called from a task running on a
+  /// worker thread, the call may process pending tasks while waiting in order
+  /// not to waste the thread.
+  void wait(ThreadPoolTaskGroup &Group);
+
   // TODO: misleading legacy name warning!
   // Returns the maximum number of worker threads in the pool, not the current
   // number of threads!
@@ -98,12 +138,15 @@ class ThreadPool {
             std::move(F)};
   }
 
-  bool workCompletedUnlocked() { return !ActiveThreads && Tasks.empty(); }
+  /// Returns true if all tasks in the given group have finished (nullptr means
+  /// all tasks regardless of their group). QueueLock must be locked.
+  bool workCompletedUnlocked(ThreadPoolTaskGroup *Group) const;
 
   /// Asynchronous submission of a task to the pool. The returned future can be
   /// used to wait for the task to finish and is *non-blocking* on destruction.
   template <typename ResTy>
-  std::shared_future<ResTy> asyncImpl(std::function<ResTy()> Task) {
+  std::shared_future<ResTy> asyncImpl(std::function<ResTy()> Task,
+                                      ThreadPoolTaskGroup *Group) {
 
 #if LLVM_ENABLE_THREADS
     /// Wrap the Task in a std::function<void()> that sets the result of the
@@ -117,7 +160,7 @@ class ThreadPool {
 
       // Don't allow enqueueing after disabling the pool
       assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
-      Tasks.push(std::move(R.first));
+      Tasks.emplace_back(std::make_pair(std::move(R.first), Group));
       requestedThreads = ActiveThreads + Tasks.size();
     }
     QueueCondition.notify_one();
@@ -130,7 +173,7 @@ class ThreadPool {
     auto Future = std::async(std::launch::deferred, std::move(Task)).share();
     // Wrap the future so that both ThreadPool::wait() can operate and the
     // returned future can be sync'ed on.
-    Tasks.push([Future]() { Future.get(); });
+    Tasks.emplace_back(std::make_pair([Future]() { Future.get(); }, Group));
     return Future;
 #endif
   }
@@ -139,25 +182,29 @@ class ThreadPool {
   // Grow to ensure that we have at least `requested` Threads, but do not go
   // over MaxThreadCount.
   void grow(int requested);
+
+  void processTasks(ThreadPoolTaskGroup *WaitingForGroup);
 #endif
 
   /// Threads in flight
   std::vector<llvm::thread> Threads;
   /// Lock protecting access to the Threads vector.
-  mutable std::mutex ThreadsLock;
+  mutable llvm::sys::RWMutex ThreadsLock;
 
   /// Tasks waiting for execution in the pool.
-  std::queue<std::function<void()>> Tasks;
+  std::deque<std::pair<std::function<void()>, ThreadPoolTaskGroup *>> Tasks;
 
   /// Locking and signaling for accessing the Tasks queue.
   std::mutex QueueLock;
   std::condition_variable QueueCondition;
 
-  /// Signaling for job completion
+  /// Signaling for job completion (all tasks or all tasks in a group).
   std::condition_variable CompletionCondition;
 
   /// Keep track of the number of thread actually busy
   unsigned ActiveThreads = 0;
+  /// Number of threads active for tasks in the given group (only non-zero).
+  DenseMap<ThreadPoolTaskGroup *, unsigned> ActiveGroups;
 
 #if LLVM_ENABLE_THREADS // avoids warning for unused variable
   /// Signal for the destruction of the pool, asking thread to exit.
@@ -169,6 +216,34 @@ class ThreadPool {
   /// Maximum number of threads to potentially grow this pool to.
   const unsigned MaxThreadCount;
 };
-}
+
+/// A group of tasks to be run on a thread pool. Thread pool tasks in 
diff erent
+/// groups can run on the same threadpool but can be waited for separately.
+/// It is even possible for tasks of one group to submit and wait for tasks
+/// of another group, as long as this does not form a loop.
+class ThreadPoolTaskGroup {
+public:
+  /// The ThreadPool argument is the thread pool to forward calls to.
+  ThreadPoolTaskGroup(ThreadPool &Pool) : Pool(Pool) {}
+
+  /// Blocking destructor: will wait for all the tasks in the group to complete
+  /// by calling ThreadPool::wait().
+  ~ThreadPoolTaskGroup() { wait(); }
+
+  /// Calls ThreadPool::async() for this group.
+  template <typename Function, typename... Args>
+  inline auto async(Function &&F, Args &&...ArgList) {
+    return Pool.async(*this, std::forward<Function>(F),
+                      std::forward<Args>(ArgList)...);
+  }
+
+  /// Calls ThreadPool::wait() for this group.
+  void wait() { Pool.wait(*this); }
+
+private:
+  ThreadPool &Pool;
+};
+
+} // namespace llvm
 
 #endif // LLVM_SUPPORT_THREADPOOL_H

diff  --git a/llvm/lib/Support/ThreadPool.cpp b/llvm/lib/Support/ThreadPool.cpp
index 9f92ae1c7a7c1..bb566eb66121f 100644
--- a/llvm/lib/Support/ThreadPool.cpp
+++ b/llvm/lib/Support/ThreadPool.cpp
@@ -24,11 +24,19 @@ using namespace llvm;
 
 #if LLVM_ENABLE_THREADS
 
+// A note on thread groups: Tasks are by default in no group (represented
+// by nullptr ThreadPoolTaskGroup pointer in the Tasks queue) and functionality
+// here normally works on all tasks regardless of their group (functions
+// in that case receive nullptr ThreadPoolTaskGroup pointer as argument).
+// A task in a group has a pointer to that ThreadPoolTaskGroup in the Tasks
+// queue, and functions called to work only on tasks from one group take that
+// pointer.
+
 ThreadPool::ThreadPool(ThreadPoolStrategy S)
     : Strategy(S), MaxThreadCount(S.compute_thread_count()) {}
 
 void ThreadPool::grow(int requested) {
-  std::unique_lock<std::mutex> LockGuard(ThreadsLock);
+  llvm::sys::ScopedWriter LockGuard(ThreadsLock);
   if (Threads.size() >= MaxThreadCount)
     return; // Already hit the max thread pool size.
   int newThreadCount = std::min<int>(requested, MaxThreadCount);
@@ -36,52 +44,125 @@ void ThreadPool::grow(int requested) {
     int ThreadID = Threads.size();
     Threads.emplace_back([this, ThreadID] {
       Strategy.apply_thread_strategy(ThreadID);
-      while (true) {
-        std::function<void()> Task;
-        {
-          std::unique_lock<std::mutex> LockGuard(QueueLock);
-          // Wait for tasks to be pushed in the queue
-          QueueCondition.wait(LockGuard,
-                              [&] { return !EnableFlag || !Tasks.empty(); });
-          // Exit condition
-          if (!EnableFlag && Tasks.empty())
-            return;
-          // Yeah, we have a task, grab it and release the lock on the queue
-
-          // We first need to signal that we are active before popping the queue
-          // in order for wait() to properly detect that even if the queue is
-          // empty, there is still a task in flight.
-          ++ActiveThreads;
-          Task = std::move(Tasks.front());
-          Tasks.pop();
-        }
-        // Run the task we just grabbed
-        Task();
-
-        bool Notify;
-        {
-          // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait()
-          std::lock_guard<std::mutex> LockGuard(QueueLock);
-          --ActiveThreads;
-          Notify = workCompletedUnlocked();
-        }
-        // Notify task completion if this is the last active thread, in case
-        // someone waits on ThreadPool::wait().
-        if (Notify)
-          CompletionCondition.notify_all();
-      }
+      processTasks(nullptr);
     });
   }
 }
 
+#ifndef NDEBUG
+// The group of the tasks run by the current thread.
+static LLVM_THREAD_LOCAL std::vector<ThreadPoolTaskGroup *>
+    *CurrentThreadTaskGroups = nullptr;
+#endif
+
+// WaitingForGroup == nullptr means all tasks regardless of their group.
+void ThreadPool::processTasks(ThreadPoolTaskGroup *WaitingForGroup) {
+  while (true) {
+    std::function<void()> Task;
+    ThreadPoolTaskGroup *GroupOfTask;
+    {
+      std::unique_lock<std::mutex> LockGuard(QueueLock);
+      bool workCompletedForGroup = false; // Result of workCompletedUnlocked()
+      // Wait for tasks to be pushed in the queue
+      QueueCondition.wait(LockGuard, [&] {
+        return !EnableFlag || !Tasks.empty() ||
+               (WaitingForGroup != nullptr &&
+                (workCompletedForGroup =
+                     workCompletedUnlocked(WaitingForGroup)));
+      });
+      // Exit condition
+      if (!EnableFlag && Tasks.empty())
+        return;
+      if (WaitingForGroup != nullptr && workCompletedForGroup)
+        return;
+      // Yeah, we have a task, grab it and release the lock on the queue
+
+      // We first need to signal that we are active before popping the queue
+      // in order for wait() to properly detect that even if the queue is
+      // empty, there is still a task in flight.
+      ++ActiveThreads;
+      Task = std::move(Tasks.front().first);
+      GroupOfTask = Tasks.front().second;
+      // Need to count active threads in each group separately, ActiveThreads
+      // would never be 0 if waiting for another group inside a wait.
+      if (GroupOfTask != nullptr)
+        ++ActiveGroups[GroupOfTask]; // Increment or set to 1 if new item
+      Tasks.pop_front();
+    }
+#ifndef NDEBUG
+    if (CurrentThreadTaskGroups == nullptr)
+      CurrentThreadTaskGroups = new std::vector<ThreadPoolTaskGroup *>;
+    CurrentThreadTaskGroups->push_back(GroupOfTask);
+#endif
+
+    // Run the task we just grabbed
+    Task();
+
+#ifndef NDEBUG
+    CurrentThreadTaskGroups->pop_back();
+#endif
+
+    bool Notify;
+    bool NotifyGroup;
+    {
+      // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait()
+      std::lock_guard<std::mutex> LockGuard(QueueLock);
+      --ActiveThreads;
+      if (GroupOfTask != nullptr) {
+        auto A = ActiveGroups.find(GroupOfTask);
+        if (--(A->second) == 0)
+          ActiveGroups.erase(A);
+      }
+      Notify = workCompletedUnlocked(GroupOfTask);
+      NotifyGroup = GroupOfTask != nullptr && Notify;
+    }
+    // Notify task completion if this is the last active thread, in case
+    // someone waits on ThreadPool::wait().
+    if (Notify)
+      CompletionCondition.notify_all();
+    // If this was a task in a group, notify also threads waiting for tasks
+    // in this function on QueueCondition, to make a recursive wait() return
+    // after the group it's been waiting for has finished.
+    if (NotifyGroup)
+      QueueCondition.notify_all();
+  }
+}
+
+bool ThreadPool::workCompletedUnlocked(ThreadPoolTaskGroup *Group) const {
+  if (Group == nullptr)
+    return !ActiveThreads && Tasks.empty();
+  return ActiveGroups.count(Group) == 0 &&
+         !llvm::any_of(Tasks,
+                       [Group](const auto &T) { return T.second == Group; });
+}
+
 void ThreadPool::wait() {
+  assert(!isWorkerThread()); // Would deadlock waiting for itself.
   // Wait for all threads to complete and the queue to be empty
   std::unique_lock<std::mutex> LockGuard(QueueLock);
-  CompletionCondition.wait(LockGuard, [&] { return workCompletedUnlocked(); });
+  CompletionCondition.wait(LockGuard,
+                           [&] { return workCompletedUnlocked(nullptr); });
+}
+
+void ThreadPool::wait(ThreadPoolTaskGroup &Group) {
+  // Wait for all threads in the group to complete.
+  if (!isWorkerThread()) {
+    std::unique_lock<std::mutex> LockGuard(QueueLock);
+    CompletionCondition.wait(LockGuard,
+                             [&] { return workCompletedUnlocked(&Group); });
+    return;
+  }
+  // Make sure to not deadlock waiting for oneself.
+  assert(CurrentThreadTaskGroups == nullptr ||
+         !llvm::is_contained(*CurrentThreadTaskGroups, &Group));
+  // Handle the case of recursive call from another task in a 
diff erent group,
+  // in which case process tasks while waiting to keep the thread busy and avoid
+  // possible deadlock.
+  processTasks(&Group);
 }
 
 bool ThreadPool::isWorkerThread() const {
-  std::unique_lock<std::mutex> LockGuard(ThreadsLock);
+  llvm::sys::ScopedReader LockGuard(ThreadsLock);
   llvm::thread::id CurrentThreadId = llvm::this_thread::get_id();
   for (const llvm::thread &Thread : Threads)
     if (CurrentThreadId == Thread.get_id())
@@ -96,7 +177,7 @@ ThreadPool::~ThreadPool() {
     EnableFlag = false;
   }
   QueueCondition.notify_all();
-  std::unique_lock<std::mutex> LockGuard(ThreadsLock);
+  llvm::sys::ScopedReader LockGuard(ThreadsLock);
   for (auto &Worker : Threads)
     Worker.join();
 }
@@ -115,12 +196,18 @@ ThreadPool::ThreadPool(ThreadPoolStrategy S) : MaxThreadCount(1) {
 void ThreadPool::wait() {
   // Sequential implementation running the tasks
   while (!Tasks.empty()) {
-    auto Task = std::move(Tasks.front());
-    Tasks.pop();
+    auto Task = std::move(Tasks.front().first);
+    Tasks.pop_front();
     Task();
   }
 }
 
+void ThreadPool::wait(ThreadPoolTaskGroup &) {
+  // Simply wait for all, this works even if recursive (the running task
+  // is already removed from the queue).
+  wait();
+}
+
 bool ThreadPool::isWorkerThread() const {
   report_fatal_error("LLVM compiled without multithreading");
 }

diff  --git a/llvm/tools/llvm-profdata/llvm-profdata.cpp b/llvm/tools/llvm-profdata/llvm-profdata.cpp
index ce5f0a8ef994e..aea608897874b 100644
--- a/llvm/tools/llvm-profdata/llvm-profdata.cpp
+++ b/llvm/tools/llvm-profdata/llvm-profdata.cpp
@@ -38,6 +38,7 @@
 #include "llvm/Support/WithColor.h"
 #include "llvm/Support/raw_ostream.h"
 #include <algorithm>
+#include <queue>
 
 using namespace llvm;
 

diff  --git a/llvm/unittests/Support/ThreadPool.cpp b/llvm/unittests/Support/ThreadPool.cpp
index b5fd81d2b18a1..fd9d7272e7e0b 100644
--- a/llvm/unittests/Support/ThreadPool.cpp
+++ b/llvm/unittests/Support/ThreadPool.cpp
@@ -18,6 +18,9 @@
 #include "llvm/Support/TargetSelect.h"
 #include "llvm/Support/Threading.h"
 
+#include <chrono>
+#include <thread>
+
 #include "gtest/gtest.h"
 
 using namespace llvm;
@@ -29,6 +32,7 @@ class ThreadPoolTest : public testing::Test {
   SmallVector<Triple::ArchType, 4> UnsupportedArchs;
   SmallVector<Triple::OSType, 4> UnsupportedOSs;
   SmallVector<Triple::EnvironmentType, 1> UnsupportedEnvironments;
+
 protected:
   // This is intended for platform as a temporary "XFAIL"
   bool isUnsupportedOSOrEnvironment() {
@@ -57,27 +61,45 @@ class ThreadPoolTest : public testing::Test {
   }
 
   /// Make sure this thread not progress faster than the main thread.
-  void waitForMainThread() {
-    std::unique_lock<std::mutex> LockGuard(WaitMainThreadMutex);
-    WaitMainThread.wait(LockGuard, [&] { return MainThreadReady; });
-  }
+  void waitForMainThread() { waitForPhase(1); }
 
   /// Set the readiness of the main thread.
-  void setMainThreadReady() {
+  void setMainThreadReady() { setPhase(1); }
+
+  /// Wait until given phase is set using setPhase(); first "main" phase is 1.
+  /// See also PhaseResetHelper below.
+  void waitForPhase(int Phase) {
+    std::unique_lock<std::mutex> LockGuard(CurrentPhaseMutex);
+    CurrentPhaseCondition.wait(
+        LockGuard, [&] { return CurrentPhase == Phase || CurrentPhase < 0; });
+  }
+  /// If a thread waits on another phase, the test could bail out on a failed
+  /// assertion and ThreadPool destructor would wait() on all threads, which
+  /// would deadlock on the task waiting. Create this helper to automatically
+  /// reset the phase and unblock such threads.
+  struct PhaseResetHelper {
+    PhaseResetHelper(ThreadPoolTest *test) : test(test) {}
+    ~PhaseResetHelper() { test->setPhase(-1); }
+    ThreadPoolTest *test;
+  };
+
+  /// Advance to the given phase.
+  void setPhase(int Phase) {
     {
-      std::unique_lock<std::mutex> LockGuard(WaitMainThreadMutex);
-      MainThreadReady = true;
+      std::unique_lock<std::mutex> LockGuard(CurrentPhaseMutex);
+      assert(Phase == CurrentPhase + 1 || Phase < 0);
+      CurrentPhase = Phase;
     }
-    WaitMainThread.notify_all();
+    CurrentPhaseCondition.notify_all();
   }
 
-  void SetUp() override { MainThreadReady = false; }
+  void SetUp() override { CurrentPhase = 0; }
 
   std::vector<llvm::BitVector> RunOnAllSockets(ThreadPoolStrategy S);
 
-  std::condition_variable WaitMainThread;
-  std::mutex WaitMainThreadMutex;
-  bool MainThreadReady = false;
+  std::condition_variable CurrentPhaseCondition;
+  std::mutex CurrentPhaseMutex;
+  int CurrentPhase; // -1 = error, 0 = setup, 1 = ready, 2+ = custom
 };
 
 #define CHECK_UNSUPPORTED()                                                    \
@@ -194,6 +216,125 @@ TEST_F(ThreadPoolTest, PoolDestruction) {
   ASSERT_EQ(5, checked_in);
 }
 
+// Check running tasks in 
diff erent groups.
+TEST_F(ThreadPoolTest, Groups) {
+  CHECK_UNSUPPORTED();
+  // Need at least two threads, as the task in group2
+  // might block a thread until all tasks in group1 finish.
+  ThreadPoolStrategy S = hardware_concurrency(2);
+  if (S.compute_thread_count() < 2)
+    return;
+  ThreadPool Pool(S);
+  PhaseResetHelper Helper(this);
+  ThreadPoolTaskGroup Group1(Pool);
+  ThreadPoolTaskGroup Group2(Pool);
+
+  // Check that waiting for an empty group is a no-op.
+  Group1.wait();
+
+  std::atomic_int checked_in1{0};
+  std::atomic_int checked_in2{0};
+
+  for (size_t i = 0; i < 5; ++i) {
+    Group1.async([this, &checked_in1] {
+      waitForMainThread();
+      ++checked_in1;
+    });
+  }
+  Group2.async([this, &checked_in2] {
+    waitForPhase(2);
+    ++checked_in2;
+  });
+  ASSERT_EQ(0, checked_in1);
+  ASSERT_EQ(0, checked_in2);
+  // Start first group and wait for it.
+  setMainThreadReady();
+  Group1.wait();
+  ASSERT_EQ(5, checked_in1);
+  // Second group has not yet finished, start it and wait for it.
+  ASSERT_EQ(0, checked_in2);
+  setPhase(2);
+  Group2.wait();
+  ASSERT_EQ(5, checked_in1);
+  ASSERT_EQ(1, checked_in2);
+}
+
+// Check recursive tasks.
+TEST_F(ThreadPoolTest, RecursiveGroups) {
+  CHECK_UNSUPPORTED();
+  ThreadPool Pool;
+  ThreadPoolTaskGroup Group(Pool);
+
+  std::atomic_int checked_in1{0};
+
+  for (size_t i = 0; i < 5; ++i) {
+    Group.async([this, &Pool, &checked_in1] {
+      waitForMainThread();
+
+      ThreadPoolTaskGroup LocalGroup(Pool);
+
+      // Check that waiting for an empty group is a no-op.
+      LocalGroup.wait();
+
+      std::atomic_int checked_in2{0};
+      for (size_t i = 0; i < 5; ++i) {
+        LocalGroup.async([&checked_in2] { ++checked_in2; });
+      }
+      LocalGroup.wait();
+      ASSERT_EQ(5, checked_in2);
+
+      ++checked_in1;
+    });
+  }
+  ASSERT_EQ(0, checked_in1);
+  setMainThreadReady();
+  Group.wait();
+  ASSERT_EQ(5, checked_in1);
+}
+
+TEST_F(ThreadPoolTest, RecursiveWaitDeadlock) {
+  CHECK_UNSUPPORTED();
+  ThreadPoolStrategy S = hardware_concurrency(2);
+  if (S.compute_thread_count() < 2)
+    return;
+  ThreadPool Pool(S);
+  PhaseResetHelper Helper(this);
+  ThreadPoolTaskGroup Group(Pool);
+
+  // Test that a thread calling wait() for a group and is waiting for more tasks
+  // returns when the last task finishes in a 
diff erent thread while the waiting
+  // thread was waiting for more tasks to process while waiting.
+
+  // Task A runs in the first thread. It finishes and leaves
+  // the background thread waiting for more tasks.
+  Group.async([this] {
+    waitForMainThread();
+    setPhase(2);
+  });
+  // Task B is run in a second thread, it launches yet another
+  // task C in a 
diff erent group, which will be handled by the waiting
+  // thread started above.
+  Group.async([this, &Pool] {
+    waitForPhase(2);
+    ThreadPoolTaskGroup LocalGroup(Pool);
+    LocalGroup.async([this] {
+      waitForPhase(3);
+      // Give the other thread enough time to check that there's no task
+      // to process and suspend waiting for a notification. This is indeed racy,
+      // but probably the best that can be done.
+      std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    });
+    // And task B only now will wait for the tasks in the group (=task C)
+    // to finish. This test checks that it does not deadlock. If the
+    // `NotifyGroup` handling in ThreadPool::processTasks() didn't take place,
+    // this task B would be stuck waiting for tasks to arrive.
+    setPhase(3);
+    LocalGroup.wait();
+  });
+  setMainThreadReady();
+  Group.wait();
+}
+
 #if LLVM_ENABLE_THREADS == 1
 
 // FIXME: Skip some tests below on non-Windows because multi-socket systems


        


More information about the llvm-commits mailing list