[llvm] 6149e57 - [ThreadPool] Support returning futures with results.
Florian Hahn via llvm-commits
llvm-commits at lists.llvm.org
Mon Nov 22 13:21:25 PST 2021
Author: Florian Hahn
Date: 2021-11-22T21:20:55Z
New Revision: 6149e57dc1313d32c85524f8009a1249e0b8f4d1
URL: https://github.com/llvm/llvm-project/commit/6149e57dc1313d32c85524f8009a1249e0b8f4d1
DIFF: https://github.com/llvm/llvm-project/commit/6149e57dc1313d32c85524f8009a1249e0b8f4d1.diff
LOG: [ThreadPool] Support returning futures with results.
This patch adjusts ThreadPool::async to return futures that wrap
the result type of the passed in callable.
To do so, ThreadPool::asyncImpl first creates a shared promise. The
result of the promise is set in a new callable that first executes the
task. The callable is added to the task queue.
Reviewed By: mehdi_amini
Differential Revision: https://reviews.llvm.org/D114183
Added:
Modified:
llvm/include/llvm/Support/ThreadPool.h
llvm/lib/Support/ThreadPool.cpp
llvm/unittests/Support/ThreadPool.cpp
Removed:
################################################################################
diff --git a/llvm/include/llvm/Support/ThreadPool.h b/llvm/include/llvm/Support/ThreadPool.h
index 4c41b88d60438..8d30e8e92755a 100644
--- a/llvm/include/llvm/Support/ThreadPool.h
+++ b/llvm/include/llvm/Support/ThreadPool.h
@@ -36,9 +36,6 @@ namespace llvm {
/// for some work to become available.
class ThreadPool {
public:
- using TaskTy = std::function<void()>;
- using PackagedTaskTy = std::packaged_task<void()>;
-
/// Construct a pool using the hardware strategy \p S for mapping hardware
/// execution resources (threads, cores, CPUs)
/// Defaults to using the maximum execution resources in the system, but
@@ -51,17 +48,17 @@ class ThreadPool {
/// Asynchronous submission of a task to the pool. The returned future can be
/// used to wait for the task to finish and is *non-blocking* on destruction.
template <typename Function, typename... Args>
- inline std::shared_future<void> async(Function &&F, Args &&... ArgList) {
+ inline auto async(Function &&F, Args &&...ArgList) {
auto Task =
std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
- return asyncImpl(std::move(Task));
+ return async(std::move(Task));
}
/// Asynchronous submission of a task to the pool. The returned future can be
/// used to wait for the task to finish and is *non-blocking* on destruction.
- template <typename Function>
- inline std::shared_future<void> async(Function &&F) {
- return asyncImpl(std::forward<Function>(F));
+ template <typename Func>
+ auto async(Func &&F) -> std::shared_future<decltype(F())> {
+ return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)));
}
/// Blocking wait for all the threads to complete and the queue to be empty.
@@ -74,17 +71,70 @@ class ThreadPool {
bool isWorkerThread() const;
private:
+ /// Helpers to create a promise and a callable wrapper of \p Task that sets
+ /// the result of the promise. Returns the callable and a future to access the
+ /// result.
+ template <typename ResTy>
+ static std::pair<std::function<void()>, std::future<ResTy>>
+ createTaskAndFuture(std::function<ResTy()> Task) {
+ std::shared_ptr<std::promise<ResTy>> Promise =
+ std::make_shared<std::promise<ResTy>>();
+ auto F = Promise->get_future();
+ return {
+ [Promise = std::move(Promise), Task]() { Promise->set_value(Task()); },
+ std::move(F)};
+ }
+ static std::pair<std::function<void()>, std::future<void>>
+ createTaskAndFuture(std::function<void()> Task) {
+ std::shared_ptr<std::promise<void>> Promise =
+ std::make_shared<std::promise<void>>();
+ auto F = Promise->get_future();
+ return {[Promise = std::move(Promise), Task]() {
+ Task();
+ Promise->set_value();
+ },
+ std::move(F)};
+ }
+
bool workCompletedUnlocked() { return !ActiveThreads && Tasks.empty(); }
/// Asynchronous submission of a task to the pool. The returned future can be
/// used to wait for the task to finish and is *non-blocking* on destruction.
- std::shared_future<void> asyncImpl(TaskTy F);
+ template <typename ResTy>
+ std::shared_future<ResTy> asyncImpl(std::function<ResTy()> Task) {
+
+#if LLVM_ENABLE_THREADS
+ /// Wrap the Task in a std::function<void()> that sets the result of the
+ /// corresponding future.
+ auto R = createTaskAndFuture(Task);
+
+ {
+ // Lock the queue and push the new task
+ std::unique_lock<std::mutex> LockGuard(QueueLock);
+
+ // Don't allow enqueueing after disabling the pool
+ assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
+ Tasks.push(std::move(R.first));
+ }
+ QueueCondition.notify_one();
+ return R.second.share();
+
+#else // LLVM_ENABLE_THREADS Disabled
+
+ // Get a Future with launch::deferred execution using std::async
+ auto Future = std::async(std::launch::deferred, std::move(Task)).share();
+ // Wrap the future so that both ThreadPool::wait() can operate and the
+ // returned future can be sync'ed on.
+ Tasks.push([Future]() { Future.get(); });
+ return Future;
+#endif
+ }
/// Threads in flight
std::vector<llvm::thread> Threads;
/// Tasks waiting for execution in the pool.
- std::queue<PackagedTaskTy> Tasks;
+ std::queue<std::function<void()>> Tasks;
/// Locking and signaling for accessing the Tasks queue.
std::mutex QueueLock;
diff --git a/llvm/lib/Support/ThreadPool.cpp b/llvm/lib/Support/ThreadPool.cpp
index 81926d8071b2d..c11e16d3cf98c 100644
--- a/llvm/lib/Support/ThreadPool.cpp
+++ b/llvm/lib/Support/ThreadPool.cpp
@@ -29,7 +29,7 @@ ThreadPool::ThreadPool(ThreadPoolStrategy S)
Threads.emplace_back([S, ThreadID, this] {
S.apply_thread_strategy(ThreadID);
while (true) {
- PackagedTaskTy Task;
+ std::function<void()> Task;
{
std::unique_lock<std::mutex> LockGuard(QueueLock);
// Wait for tasks to be pushed in the queue
@@ -80,23 +80,6 @@ bool ThreadPool::isWorkerThread() const {
return false;
}
-std::shared_future<void> ThreadPool::asyncImpl(TaskTy Task) {
- /// Wrap the Task in a packaged_task to return a future object.
- PackagedTaskTy PackagedTask(std::move(Task));
- auto Future = PackagedTask.get_future();
- {
- // Lock the queue and push the new task
- std::unique_lock<std::mutex> LockGuard(QueueLock);
-
- // Don't allow enqueueing after disabling the pool
- assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
-
- Tasks.push(std::move(PackagedTask));
- }
- QueueCondition.notify_one();
- return Future.share();
-}
-
// The destructor joins all threads, waiting for completion.
ThreadPool::~ThreadPool() {
{
@@ -128,16 +111,6 @@ void ThreadPool::wait() {
}
}
-std::shared_future<void> ThreadPool::asyncImpl(TaskTy Task) {
- // Get a Future with launch::deferred execution using std::async
- auto Future = std::async(std::launch::deferred, std::move(Task)).share();
- // Wrap the future so that both ThreadPool::wait() can operate and the
- // returned future can be sync'ed on.
- PackagedTaskTy PackagedTask([Future]() { Future.get(); });
- Tasks.push(std::move(PackagedTask));
- return Future;
-}
-
ThreadPool::~ThreadPool() { wait(); }
#endif
diff --git a/llvm/unittests/Support/ThreadPool.cpp b/llvm/unittests/Support/ThreadPool.cpp
index a560d5069bff9..b958b4e6dce83 100644
--- a/llvm/unittests/Support/ThreadPool.cpp
+++ b/llvm/unittests/Support/ThreadPool.cpp
@@ -151,6 +151,31 @@ TEST_F(ThreadPoolTest, GetFuture) {
ASSERT_EQ(2, i.load());
}
+TEST_F(ThreadPoolTest, GetFutureWithResult) {
+ CHECK_UNSUPPORTED();
+ ThreadPool Pool(hardware_concurrency(2));
+ auto F1 = Pool.async([] { return 1; });
+ auto F2 = Pool.async([] { return 2; });
+
+ setMainThreadReady();
+ Pool.wait();
+ ASSERT_EQ(1, F1.get());
+ ASSERT_EQ(2, F2.get());
+}
+
+TEST_F(ThreadPoolTest, GetFutureWithResultAndArgs) {
+ CHECK_UNSUPPORTED();
+ ThreadPool Pool(hardware_concurrency(2));
+ auto Fn = [](int x) { return x; };
+ auto F1 = Pool.async(Fn, 1);
+ auto F2 = Pool.async(Fn, 2);
+
+ setMainThreadReady();
+ Pool.wait();
+ ASSERT_EQ(1, F1.get());
+ ASSERT_EQ(2, F2.get());
+}
+
TEST_F(ThreadPoolTest, PoolDestruction) {
CHECK_UNSUPPORTED();
// Test that we are waiting on destruction
More information about the llvm-commits
mailing list