[llvm] 8cb1af7 - Recommit [ThreadPool] Support returning futures with results.

Florian Hahn via llvm-commits llvm-commits at lists.llvm.org
Thu Nov 25 12:10:13 PST 2021


Author: Florian Hahn
Date: 2021-11-25T20:07:53Z
New Revision: 8cb1af73c6171be009bf06b4a0d569d3882923ad

URL: https://github.com/llvm/llvm-project/commit/8cb1af73c6171be009bf06b4a0d569d3882923ad
DIFF: https://github.com/llvm/llvm-project/commit/8cb1af73c6171be009bf06b4a0d569d3882923ad.diff

LOG: Recommit [ThreadPool] Support returning futures with results.

This reverts commit 71a7c55f0f021b04b9a7303d0cd391b9161cf303.

The revert broken building llvm-reduce and it is not clear it fixes an
issue with LLVM_ENABLE_THREADS=OFF.

See discussion in https://reviews.llvm.org/D114183 for more details.

Added: 
    

Modified: 
    llvm/include/llvm/Support/ThreadPool.h
    llvm/lib/Support/ThreadPool.cpp
    llvm/unittests/Support/ThreadPool.cpp

Removed: 
    


################################################################################
diff  --git a/llvm/include/llvm/Support/ThreadPool.h b/llvm/include/llvm/Support/ThreadPool.h
index 4c41b88d60438..4f6ccc069f0fc 100644
--- a/llvm/include/llvm/Support/ThreadPool.h
+++ b/llvm/include/llvm/Support/ThreadPool.h
@@ -36,9 +36,6 @@ namespace llvm {
 /// for some work to become available.
 class ThreadPool {
 public:
-  using TaskTy = std::function<void()>;
-  using PackagedTaskTy = std::packaged_task<void()>;
-
   /// Construct a pool using the hardware strategy \p S for mapping hardware
   /// execution resources (threads, cores, CPUs)
   /// Defaults to using the maximum execution resources in the system, but
@@ -51,17 +48,17 @@ class ThreadPool {
   /// Asynchronous submission of a task to the pool. The returned future can be
   /// used to wait for the task to finish and is *non-blocking* on destruction.
   template <typename Function, typename... Args>
-  inline std::shared_future<void> async(Function &&F, Args &&... ArgList) {
+  inline auto async(Function &&F, Args &&...ArgList) {
     auto Task =
         std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
-    return asyncImpl(std::move(Task));
+    return async(std::move(Task));
   }
 
   /// Asynchronous submission of a task to the pool. The returned future can be
   /// used to wait for the task to finish and is *non-blocking* on destruction.
-  template <typename Function>
-  inline std::shared_future<void> async(Function &&F) {
-    return asyncImpl(std::forward<Function>(F));
+  template <typename Func>
+  auto async(Func &&F) -> std::shared_future<decltype(F())> {
+    return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)));
   }
 
   /// Blocking wait for all the threads to complete and the queue to be empty.
@@ -74,17 +71,71 @@ class ThreadPool {
   bool isWorkerThread() const;
 
 private:
+  /// Helpers to create a promise and a callable wrapper of \p Task that sets
+  /// the result of the promise. Returns the callable and a future to access the
+  /// result.
+  template <typename ResTy>
+  static std::pair<std::function<void()>, std::future<ResTy>>
+  createTaskAndFuture(std::function<ResTy()> Task) {
+    std::shared_ptr<std::promise<ResTy>> Promise =
+        std::make_shared<std::promise<ResTy>>();
+    auto F = Promise->get_future();
+    return {
+        [Promise = std::move(Promise), Task]() { Promise->set_value(Task()); },
+        std::move(F)};
+  }
+  static std::pair<std::function<void()>, std::future<void>>
+  createTaskAndFuture(std::function<void()> Task) {
+    std::shared_ptr<std::promise<void>> Promise =
+        std::make_shared<std::promise<void>>();
+    auto F = Promise->get_future();
+    return {[Promise = std::move(Promise), Task]() {
+              Task();
+              Promise->set_value();
+            },
+            std::move(F)};
+  }
+
   bool workCompletedUnlocked() { return !ActiveThreads && Tasks.empty(); }
 
   /// Asynchronous submission of a task to the pool. The returned future can be
   /// used to wait for the task to finish and is *non-blocking* on destruction.
-  std::shared_future<void> asyncImpl(TaskTy F);
+  template <typename ResTy>
+  std::shared_future<ResTy> asyncImpl(std::function<ResTy()> Task) {
+
+#if LLVM_ENABLE_THREADS
+    /// Wrap the Task in a std::function<void()> that sets the result of the
+    /// corresponding future.
+    auto R = createTaskAndFuture(Task);
+
+    {
+      // Lock the queue and push the new task
+      std::unique_lock<std::mutex> LockGuard(QueueLock);
+
+      // Don't allow enqueueing after disabling the pool
+      assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
+      Tasks.push(std::move(R.first));
+    }
+    QueueCondition.notify_one();
+    return R.second.share();
+
+#else // LLVM_ENABLE_THREADS Disabled
+
+    // Get a Future with launch::deferred execution using std::async
+    std::future<void> Future =
+        std::async(std::launch::deferred, std::move(Task)).share();
+    // Wrap the future so that both ThreadPool::wait() can operate and the
+    // returned future can be sync'ed on.
+    Tasks.push([Future]() { Future.get(); });
+    return Future;
+#endif
+  }
 
   /// Threads in flight
   std::vector<llvm::thread> Threads;
 
   /// Tasks waiting for execution in the pool.
-  std::queue<PackagedTaskTy> Tasks;
+  std::queue<std::function<void()>> Tasks;
 
   /// Locking and signaling for accessing the Tasks queue.
   std::mutex QueueLock;

diff  --git a/llvm/lib/Support/ThreadPool.cpp b/llvm/lib/Support/ThreadPool.cpp
index 81926d8071b2d..c11e16d3cf98c 100644
--- a/llvm/lib/Support/ThreadPool.cpp
+++ b/llvm/lib/Support/ThreadPool.cpp
@@ -29,7 +29,7 @@ ThreadPool::ThreadPool(ThreadPoolStrategy S)
     Threads.emplace_back([S, ThreadID, this] {
       S.apply_thread_strategy(ThreadID);
       while (true) {
-        PackagedTaskTy Task;
+        std::function<void()> Task;
         {
           std::unique_lock<std::mutex> LockGuard(QueueLock);
           // Wait for tasks to be pushed in the queue
@@ -80,23 +80,6 @@ bool ThreadPool::isWorkerThread() const {
   return false;
 }
 
-std::shared_future<void> ThreadPool::asyncImpl(TaskTy Task) {
-  /// Wrap the Task in a packaged_task to return a future object.
-  PackagedTaskTy PackagedTask(std::move(Task));
-  auto Future = PackagedTask.get_future();
-  {
-    // Lock the queue and push the new task
-    std::unique_lock<std::mutex> LockGuard(QueueLock);
-
-    // Don't allow enqueueing after disabling the pool
-    assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
-
-    Tasks.push(std::move(PackagedTask));
-  }
-  QueueCondition.notify_one();
-  return Future.share();
-}
-
 // The destructor joins all threads, waiting for completion.
 ThreadPool::~ThreadPool() {
   {
@@ -128,16 +111,6 @@ void ThreadPool::wait() {
   }
 }
 
-std::shared_future<void> ThreadPool::asyncImpl(TaskTy Task) {
-  // Get a Future with launch::deferred execution using std::async
-  auto Future = std::async(std::launch::deferred, std::move(Task)).share();
-  // Wrap the future so that both ThreadPool::wait() can operate and the
-  // returned future can be sync'ed on.
-  PackagedTaskTy PackagedTask([Future]() { Future.get(); });
-  Tasks.push(std::move(PackagedTask));
-  return Future;
-}
-
 ThreadPool::~ThreadPool() { wait(); }
 
 #endif

diff  --git a/llvm/unittests/Support/ThreadPool.cpp b/llvm/unittests/Support/ThreadPool.cpp
index a560d5069bff9..b958b4e6dce83 100644
--- a/llvm/unittests/Support/ThreadPool.cpp
+++ b/llvm/unittests/Support/ThreadPool.cpp
@@ -151,6 +151,31 @@ TEST_F(ThreadPoolTest, GetFuture) {
   ASSERT_EQ(2, i.load());
 }
 
+TEST_F(ThreadPoolTest, GetFutureWithResult) {
+  CHECK_UNSUPPORTED();
+  ThreadPool Pool(hardware_concurrency(2));
+  auto F1 = Pool.async([] { return 1; });
+  auto F2 = Pool.async([] { return 2; });
+
+  setMainThreadReady();
+  Pool.wait();
+  ASSERT_EQ(1, F1.get());
+  ASSERT_EQ(2, F2.get());
+}
+
+TEST_F(ThreadPoolTest, GetFutureWithResultAndArgs) {
+  CHECK_UNSUPPORTED();
+  ThreadPool Pool(hardware_concurrency(2));
+  auto Fn = [](int x) { return x; };
+  auto F1 = Pool.async(Fn, 1);
+  auto F2 = Pool.async(Fn, 2);
+
+  setMainThreadReady();
+  Pool.wait();
+  ASSERT_EQ(1, F1.get());
+  ASSERT_EQ(2, F2.get());
+}
+
 TEST_F(ThreadPoolTest, PoolDestruction) {
   CHECK_UNSUPPORTED();
   // Test that we are waiting on destruction


        


More information about the llvm-commits mailing list