[llvm] 71a7c55 - Revert "[ThreadPool] Support returning futures with results."

Daniel McIntosh via llvm-commits llvm-commits at lists.llvm.org
Thu Nov 25 09:29:16 PST 2021


Author: Daniel McIntosh
Date: 2021-11-25T12:19:35-05:00
New Revision: 71a7c55f0f021b04b9a7303d0cd391b9161cf303

URL: https://github.com/llvm/llvm-project/commit/71a7c55f0f021b04b9a7303d0cd391b9161cf303
DIFF: https://github.com/llvm/llvm-project/commit/71a7c55f0f021b04b9a7303d0cd391b9161cf303.diff

LOG: Revert "[ThreadPool] Support returning futures with results."

This reverts commit 6149e57dc1313d32c85524f8009a1249e0b8f4d1.

The offending commit broke building with LLVM_ENABLE_THREADS=OFF.

Added: 
    

Modified: 
    llvm/include/llvm/Support/ThreadPool.h
    llvm/lib/Support/ThreadPool.cpp
    llvm/unittests/Support/ThreadPool.cpp

Removed: 
    


################################################################################
diff  --git a/llvm/include/llvm/Support/ThreadPool.h b/llvm/include/llvm/Support/ThreadPool.h
index 4f6ccc069f0fc..4c41b88d60438 100644
--- a/llvm/include/llvm/Support/ThreadPool.h
+++ b/llvm/include/llvm/Support/ThreadPool.h
@@ -36,6 +36,9 @@ namespace llvm {
 /// for some work to become available.
 class ThreadPool {
 public:
+  using TaskTy = std::function<void()>;
+  using PackagedTaskTy = std::packaged_task<void()>;
+
   /// Construct a pool using the hardware strategy \p S for mapping hardware
   /// execution resources (threads, cores, CPUs)
   /// Defaults to using the maximum execution resources in the system, but
@@ -48,17 +51,17 @@ class ThreadPool {
   /// Asynchronous submission of a task to the pool. The returned future can be
   /// used to wait for the task to finish and is *non-blocking* on destruction.
   template <typename Function, typename... Args>
-  inline auto async(Function &&F, Args &&...ArgList) {
+  inline std::shared_future<void> async(Function &&F, Args &&... ArgList) {
     auto Task =
         std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
-    return async(std::move(Task));
+    return asyncImpl(std::move(Task));
   }
 
   /// Asynchronous submission of a task to the pool. The returned future can be
   /// used to wait for the task to finish and is *non-blocking* on destruction.
-  template <typename Func>
-  auto async(Func &&F) -> std::shared_future<decltype(F())> {
-    return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)));
+  template <typename Function>
+  inline std::shared_future<void> async(Function &&F) {
+    return asyncImpl(std::forward<Function>(F));
   }
 
   /// Blocking wait for all the threads to complete and the queue to be empty.
@@ -71,71 +74,17 @@ class ThreadPool {
   bool isWorkerThread() const;
 
 private:
-  /// Helpers to create a promise and a callable wrapper of \p Task that sets
-  /// the result of the promise. Returns the callable and a future to access the
-  /// result.
-  template <typename ResTy>
-  static std::pair<std::function<void()>, std::future<ResTy>>
-  createTaskAndFuture(std::function<ResTy()> Task) {
-    std::shared_ptr<std::promise<ResTy>> Promise =
-        std::make_shared<std::promise<ResTy>>();
-    auto F = Promise->get_future();
-    return {
-        [Promise = std::move(Promise), Task]() { Promise->set_value(Task()); },
-        std::move(F)};
-  }
-  static std::pair<std::function<void()>, std::future<void>>
-  createTaskAndFuture(std::function<void()> Task) {
-    std::shared_ptr<std::promise<void>> Promise =
-        std::make_shared<std::promise<void>>();
-    auto F = Promise->get_future();
-    return {[Promise = std::move(Promise), Task]() {
-              Task();
-              Promise->set_value();
-            },
-            std::move(F)};
-  }
-
   bool workCompletedUnlocked() { return !ActiveThreads && Tasks.empty(); }
 
   /// Asynchronous submission of a task to the pool. The returned future can be
   /// used to wait for the task to finish and is *non-blocking* on destruction.
-  template <typename ResTy>
-  std::shared_future<ResTy> asyncImpl(std::function<ResTy()> Task) {
-
-#if LLVM_ENABLE_THREADS
-    /// Wrap the Task in a std::function<void()> that sets the result of the
-    /// corresponding future.
-    auto R = createTaskAndFuture(Task);
-
-    {
-      // Lock the queue and push the new task
-      std::unique_lock<std::mutex> LockGuard(QueueLock);
-
-      // Don't allow enqueueing after disabling the pool
-      assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
-      Tasks.push(std::move(R.first));
-    }
-    QueueCondition.notify_one();
-    return R.second.share();
-
-#else // LLVM_ENABLE_THREADS Disabled
-
-    // Get a Future with launch::deferred execution using std::async
-    std::future<void> Future =
-        std::async(std::launch::deferred, std::move(Task)).share();
-    // Wrap the future so that both ThreadPool::wait() can operate and the
-    // returned future can be sync'ed on.
-    Tasks.push([Future]() { Future.get(); });
-    return Future;
-#endif
-  }
+  std::shared_future<void> asyncImpl(TaskTy F);
 
   /// Threads in flight
   std::vector<llvm::thread> Threads;
 
   /// Tasks waiting for execution in the pool.
-  std::queue<std::function<void()>> Tasks;
+  std::queue<PackagedTaskTy> Tasks;
 
   /// Locking and signaling for accessing the Tasks queue.
   std::mutex QueueLock;

diff  --git a/llvm/lib/Support/ThreadPool.cpp b/llvm/lib/Support/ThreadPool.cpp
index c11e16d3cf98c..81926d8071b2d 100644
--- a/llvm/lib/Support/ThreadPool.cpp
+++ b/llvm/lib/Support/ThreadPool.cpp
@@ -29,7 +29,7 @@ ThreadPool::ThreadPool(ThreadPoolStrategy S)
     Threads.emplace_back([S, ThreadID, this] {
       S.apply_thread_strategy(ThreadID);
       while (true) {
-        std::function<void()> Task;
+        PackagedTaskTy Task;
         {
           std::unique_lock<std::mutex> LockGuard(QueueLock);
           // Wait for tasks to be pushed in the queue
@@ -80,6 +80,23 @@ bool ThreadPool::isWorkerThread() const {
   return false;
 }
 
+std::shared_future<void> ThreadPool::asyncImpl(TaskTy Task) {
+  /// Wrap the Task in a packaged_task to return a future object.
+  PackagedTaskTy PackagedTask(std::move(Task));
+  auto Future = PackagedTask.get_future();
+  {
+    // Lock the queue and push the new task
+    std::unique_lock<std::mutex> LockGuard(QueueLock);
+
+    // Don't allow enqueueing after disabling the pool
+    assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
+
+    Tasks.push(std::move(PackagedTask));
+  }
+  QueueCondition.notify_one();
+  return Future.share();
+}
+
 // The destructor joins all threads, waiting for completion.
 ThreadPool::~ThreadPool() {
   {
@@ -111,6 +128,16 @@ void ThreadPool::wait() {
   }
 }
 
+std::shared_future<void> ThreadPool::asyncImpl(TaskTy Task) {
+  // Get a Future with launch::deferred execution using std::async
+  auto Future = std::async(std::launch::deferred, std::move(Task)).share();
+  // Wrap the future so that both ThreadPool::wait() can operate and the
+  // returned future can be sync'ed on.
+  PackagedTaskTy PackagedTask([Future]() { Future.get(); });
+  Tasks.push(std::move(PackagedTask));
+  return Future;
+}
+
 ThreadPool::~ThreadPool() { wait(); }
 
 #endif

diff  --git a/llvm/unittests/Support/ThreadPool.cpp b/llvm/unittests/Support/ThreadPool.cpp
index b958b4e6dce83..a560d5069bff9 100644
--- a/llvm/unittests/Support/ThreadPool.cpp
+++ b/llvm/unittests/Support/ThreadPool.cpp
@@ -151,31 +151,6 @@ TEST_F(ThreadPoolTest, GetFuture) {
   ASSERT_EQ(2, i.load());
 }
 
-TEST_F(ThreadPoolTest, GetFutureWithResult) {
-  CHECK_UNSUPPORTED();
-  ThreadPool Pool(hardware_concurrency(2));
-  auto F1 = Pool.async([] { return 1; });
-  auto F2 = Pool.async([] { return 2; });
-
-  setMainThreadReady();
-  Pool.wait();
-  ASSERT_EQ(1, F1.get());
-  ASSERT_EQ(2, F2.get());
-}
-
-TEST_F(ThreadPoolTest, GetFutureWithResultAndArgs) {
-  CHECK_UNSUPPORTED();
-  ThreadPool Pool(hardware_concurrency(2));
-  auto Fn = [](int x) { return x; };
-  auto F1 = Pool.async(Fn, 1);
-  auto F2 = Pool.async(Fn, 2);
-
-  setMainThreadReady();
-  Pool.wait();
-  ASSERT_EQ(1, F1.get());
-  ASSERT_EQ(2, F2.get());
-}
-
 TEST_F(ThreadPoolTest, PoolDestruction) {
   CHECK_UNSUPPORTED();
   // Test that we are waiting on destruction


        


More information about the llvm-commits mailing list