[llvm] r334643 - Enable ThreadPool to support tasks that return values.

Zachary Turner via llvm-commits llvm-commits at lists.llvm.org
Wed Jun 13 12:29:16 PDT 2018


Author: zturner
Date: Wed Jun 13 12:29:16 2018
New Revision: 334643

URL: http://llvm.org/viewvc/llvm-project?rev=334643&view=rev
Log:
Enable ThreadPool to support tasks that return values.

Previously ThreadPool could only queue async "jobs", i.e. work
that was done for its side effects and not for its result.  It's
useful occasionally to queue async work that returns a value.
>From an API perspective, this is very intuitive.  The previous
API just returned a shared_future<void>, so all we need to do is
make it return a shared_future<T>, where T is the type of value
that the operation returns.

Making this work required a little magic, but ultimately it's not
too bad.  Instead of keeping a shared queue<packaged_task<void()>>
we just keep a shared queue<unique_ptr<TaskBase>>, where TaskBase
is a class with a pure virtual execute() method, then have a
templated derived class that stores a packaged_task<T()>.  Everything
else works out pretty cleanly.

Differential Revision: https://reviews.llvm.org/D48115

Modified:
    llvm/trunk/include/llvm/Support/ThreadPool.h
    llvm/trunk/lib/Support/ThreadPool.cpp
    llvm/trunk/unittests/Support/ThreadPool.cpp

Modified: llvm/trunk/include/llvm/Support/ThreadPool.h
URL: http://llvm.org/viewvc/llvm-project/llvm/trunk/include/llvm/Support/ThreadPool.h?rev=334643&r1=334642&r2=334643&view=diff
==============================================================================
--- llvm/trunk/include/llvm/Support/ThreadPool.h (original)
+++ llvm/trunk/include/llvm/Support/ThreadPool.h Wed Jun 13 12:29:16 2018
@@ -20,6 +20,7 @@
 #include <future>
 
 #include <atomic>
+#include <cassert>
 #include <condition_variable>
 #include <functional>
 #include <memory>
@@ -35,10 +36,21 @@ namespace llvm {
 /// The pool keeps a vector of threads alive, waiting on a condition variable
 /// for some work to become available.
 class ThreadPool {
-public:
-  using TaskTy = std::function<void()>;
-  using PackagedTaskTy = std::packaged_task<void()>;
+  struct TaskBase {
+    virtual ~TaskBase() {}
+    virtual void execute() = 0;
+  };
+
+  template <typename ReturnType> struct TypedTask : public TaskBase {
+    explicit TypedTask(std::packaged_task<ReturnType()> Task)
+        : Task(std::move(Task)) {}
+
+    void execute() override { Task(); }
+
+    std::packaged_task<ReturnType()> Task;
+  };
 
+public:
   /// Construct a pool with the number of threads found by
   /// hardware_concurrency().
   ThreadPool();
@@ -52,7 +64,8 @@ public:
   /// Asynchronous submission of a task to the pool. The returned future can be
   /// used to wait for the task to finish and is *non-blocking* on destruction.
   template <typename Function, typename... Args>
-  inline std::shared_future<void> async(Function &&F, Args &&... ArgList) {
+  inline std::shared_future<typename std::result_of<Function(Args...)>::type>
+  async(Function &&F, Args &&... ArgList) {
     auto Task =
         std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
     return asyncImpl(std::move(Task));
@@ -61,7 +74,8 @@ public:
   /// Asynchronous submission of a task to the pool. The returned future can be
   /// used to wait for the task to finish and is *non-blocking* on destruction.
   template <typename Function>
-  inline std::shared_future<void> async(Function &&F) {
+  inline std::shared_future<typename std::result_of<Function()>::type>
+  async(Function &&F) {
     return asyncImpl(std::forward<Function>(F));
   }
 
@@ -72,13 +86,35 @@ public:
 private:
   /// Asynchronous submission of a task to the pool. The returned future can be
   /// used to wait for the task to finish and is *non-blocking* on destruction.
-  std::shared_future<void> asyncImpl(TaskTy F);
+  template <typename TaskTy>
+  std::shared_future<typename std::result_of<TaskTy()>::type>
+  asyncImpl(TaskTy &&Task) {
+    typedef decltype(Task()) ResultTy;
+
+    /// Wrap the Task in a packaged_task to return a future object.
+    std::packaged_task<ResultTy()> PackagedTask(std::move(Task));
+    auto Future = PackagedTask.get_future();
+    std::unique_ptr<TaskBase> TB =
+        llvm::make_unique<TypedTask<ResultTy>>(std::move(PackagedTask));
+
+    {
+      // Lock the queue and push the new task
+      std::unique_lock<std::mutex> LockGuard(QueueLock);
+
+      // Don't allow enqueueing after disabling the pool
+      assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
+
+      Tasks.push(std::move(TB));
+    }
+    QueueCondition.notify_one();
+    return Future.share();
+  }
 
   /// Threads in flight
   std::vector<llvm::thread> Threads;
 
   /// Tasks waiting for execution in the pool.
-  std::queue<PackagedTaskTy> Tasks;
+  std::queue<std::unique_ptr<TaskBase>> Tasks;
 
   /// Locking and signaling for accessing the Tasks queue.
   std::mutex QueueLock;

Modified: llvm/trunk/lib/Support/ThreadPool.cpp
URL: http://llvm.org/viewvc/llvm-project/llvm/trunk/lib/Support/ThreadPool.cpp?rev=334643&r1=334642&r2=334643&view=diff
==============================================================================
--- llvm/trunk/lib/Support/ThreadPool.cpp (original)
+++ llvm/trunk/lib/Support/ThreadPool.cpp Wed Jun 13 12:29:16 2018
@@ -32,7 +32,7 @@ ThreadPool::ThreadPool(unsigned ThreadCo
   for (unsigned ThreadID = 0; ThreadID < ThreadCount; ++ThreadID) {
     Threads.emplace_back([&] {
       while (true) {
-        PackagedTaskTy Task;
+        std::unique_ptr<TaskBase> Task;
         {
           std::unique_lock<std::mutex> LockGuard(QueueLock);
           // Wait for tasks to be pushed in the queue
@@ -54,7 +54,7 @@ ThreadPool::ThreadPool(unsigned ThreadCo
           Tasks.pop();
         }
         // Run the task we just grabbed
-        Task();
+        Task->execute();
 
         {
           // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait()
@@ -79,23 +79,6 @@ void ThreadPool::wait() {
                            [&] { return !ActiveThreads && Tasks.empty(); });
 }
 
-std::shared_future<void> ThreadPool::asyncImpl(TaskTy Task) {
-  /// Wrap the Task in a packaged_task to return a future object.
-  PackagedTaskTy PackagedTask(std::move(Task));
-  auto Future = PackagedTask.get_future();
-  {
-    // Lock the queue and push the new task
-    std::unique_lock<std::mutex> LockGuard(QueueLock);
-
-    // Don't allow enqueueing after disabling the pool
-    assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
-
-    Tasks.push(std::move(PackagedTask));
-  }
-  QueueCondition.notify_one();
-  return Future.share();
-}
-
 // The destructor joins all threads, waiting for completion.
 ThreadPool::~ThreadPool() {
   {

Modified: llvm/trunk/unittests/Support/ThreadPool.cpp
URL: http://llvm.org/viewvc/llvm-project/llvm/trunk/unittests/Support/ThreadPool.cpp?rev=334643&r1=334642&r2=334643&view=diff
==============================================================================
--- llvm/trunk/unittests/Support/ThreadPool.cpp (original)
+++ llvm/trunk/unittests/Support/ThreadPool.cpp Wed Jun 13 12:29:16 2018
@@ -147,6 +147,25 @@ TEST_F(ThreadPoolTest, GetFuture) {
   ASSERT_EQ(2, i.load());
 }
 
+TEST_F(ThreadPoolTest, TaskWithResult) {
+  CHECK_UNSUPPORTED();
+  // By making only 1 thread in the pool the two tasks are serialized with
+  // respect to each other, which means that the second one must return 2.
+  ThreadPool Pool{1};
+  std::atomic_int i{0};
+  Pool.async([this, &i] {
+    waitForMainThread();
+    ++i;
+  });
+  // Force the future using get()
+  std::shared_future<int> Future = Pool.async([&i] { return ++i; });
+  ASSERT_EQ(0, i.load());
+  setMainThreadReady();
+  int Result = Future.get();
+  ASSERT_EQ(2, i.load());
+  ASSERT_EQ(2, Result);
+}
+
 TEST_F(ThreadPoolTest, PoolDestruction) {
   CHECK_UNSUPPORTED();
   // Test that we are waiting on destruction




More information about the llvm-commits mailing list