[llvm] 71a7c55 - Revert "[ThreadPool] Support returning futures with results."
Daniel McIntosh via llvm-commits
llvm-commits at lists.llvm.org
Thu Nov 25 09:29:16 PST 2021
Author: Daniel McIntosh
Date: 2021-11-25T12:19:35-05:00
New Revision: 71a7c55f0f021b04b9a7303d0cd391b9161cf303
URL: https://github.com/llvm/llvm-project/commit/71a7c55f0f021b04b9a7303d0cd391b9161cf303
DIFF: https://github.com/llvm/llvm-project/commit/71a7c55f0f021b04b9a7303d0cd391b9161cf303.diff
LOG: Revert "[ThreadPool] Support returning futures with results."
This reverts commit 6149e57dc1313d32c85524f8009a1249e0b8f4d1.
The offending commit broke building with LLVM_ENABLE_THREADS=OFF.
Added:
Modified:
llvm/include/llvm/Support/ThreadPool.h
llvm/lib/Support/ThreadPool.cpp
llvm/unittests/Support/ThreadPool.cpp
Removed:
################################################################################
diff --git a/llvm/include/llvm/Support/ThreadPool.h b/llvm/include/llvm/Support/ThreadPool.h
index 4f6ccc069f0fc..4c41b88d60438 100644
--- a/llvm/include/llvm/Support/ThreadPool.h
+++ b/llvm/include/llvm/Support/ThreadPool.h
@@ -36,6 +36,9 @@ namespace llvm {
/// for some work to become available.
class ThreadPool {
public:
+ using TaskTy = std::function<void()>;
+ using PackagedTaskTy = std::packaged_task<void()>;
+
/// Construct a pool using the hardware strategy \p S for mapping hardware
/// execution resources (threads, cores, CPUs)
/// Defaults to using the maximum execution resources in the system, but
@@ -48,17 +51,17 @@ class ThreadPool {
/// Asynchronous submission of a task to the pool. The returned future can be
/// used to wait for the task to finish and is *non-blocking* on destruction.
template <typename Function, typename... Args>
- inline auto async(Function &&F, Args &&...ArgList) {
+ inline std::shared_future<void> async(Function &&F, Args &&... ArgList) {
auto Task =
std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
- return async(std::move(Task));
+ return asyncImpl(std::move(Task));
}
/// Asynchronous submission of a task to the pool. The returned future can be
/// used to wait for the task to finish and is *non-blocking* on destruction.
- template <typename Func>
- auto async(Func &&F) -> std::shared_future<decltype(F())> {
- return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)));
+ template <typename Function>
+ inline std::shared_future<void> async(Function &&F) {
+ return asyncImpl(std::forward<Function>(F));
}
/// Blocking wait for all the threads to complete and the queue to be empty.
@@ -71,71 +74,17 @@ class ThreadPool {
bool isWorkerThread() const;
private:
- /// Helpers to create a promise and a callable wrapper of \p Task that sets
- /// the result of the promise. Returns the callable and a future to access the
- /// result.
- template <typename ResTy>
- static std::pair<std::function<void()>, std::future<ResTy>>
- createTaskAndFuture(std::function<ResTy()> Task) {
- std::shared_ptr<std::promise<ResTy>> Promise =
- std::make_shared<std::promise<ResTy>>();
- auto F = Promise->get_future();
- return {
- [Promise = std::move(Promise), Task]() { Promise->set_value(Task()); },
- std::move(F)};
- }
- static std::pair<std::function<void()>, std::future<void>>
- createTaskAndFuture(std::function<void()> Task) {
- std::shared_ptr<std::promise<void>> Promise =
- std::make_shared<std::promise<void>>();
- auto F = Promise->get_future();
- return {[Promise = std::move(Promise), Task]() {
- Task();
- Promise->set_value();
- },
- std::move(F)};
- }
-
bool workCompletedUnlocked() { return !ActiveThreads && Tasks.empty(); }
/// Asynchronous submission of a task to the pool. The returned future can be
/// used to wait for the task to finish and is *non-blocking* on destruction.
- template <typename ResTy>
- std::shared_future<ResTy> asyncImpl(std::function<ResTy()> Task) {
-
-#if LLVM_ENABLE_THREADS
- /// Wrap the Task in a std::function<void()> that sets the result of the
- /// corresponding future.
- auto R = createTaskAndFuture(Task);
-
- {
- // Lock the queue and push the new task
- std::unique_lock<std::mutex> LockGuard(QueueLock);
-
- // Don't allow enqueueing after disabling the pool
- assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
- Tasks.push(std::move(R.first));
- }
- QueueCondition.notify_one();
- return R.second.share();
-
-#else // LLVM_ENABLE_THREADS Disabled
-
- // Get a Future with launch::deferred execution using std::async
- std::future<void> Future =
- std::async(std::launch::deferred, std::move(Task)).share();
- // Wrap the future so that both ThreadPool::wait() can operate and the
- // returned future can be sync'ed on.
- Tasks.push([Future]() { Future.get(); });
- return Future;
-#endif
- }
+ std::shared_future<void> asyncImpl(TaskTy F);
/// Threads in flight
std::vector<llvm::thread> Threads;
/// Tasks waiting for execution in the pool.
- std::queue<std::function<void()>> Tasks;
+ std::queue<PackagedTaskTy> Tasks;
/// Locking and signaling for accessing the Tasks queue.
std::mutex QueueLock;
diff --git a/llvm/lib/Support/ThreadPool.cpp b/llvm/lib/Support/ThreadPool.cpp
index c11e16d3cf98c..81926d8071b2d 100644
--- a/llvm/lib/Support/ThreadPool.cpp
+++ b/llvm/lib/Support/ThreadPool.cpp
@@ -29,7 +29,7 @@ ThreadPool::ThreadPool(ThreadPoolStrategy S)
Threads.emplace_back([S, ThreadID, this] {
S.apply_thread_strategy(ThreadID);
while (true) {
- std::function<void()> Task;
+ PackagedTaskTy Task;
{
std::unique_lock<std::mutex> LockGuard(QueueLock);
// Wait for tasks to be pushed in the queue
@@ -80,6 +80,23 @@ bool ThreadPool::isWorkerThread() const {
return false;
}
+std::shared_future<void> ThreadPool::asyncImpl(TaskTy Task) {
+ /// Wrap the Task in a packaged_task to return a future object.
+ PackagedTaskTy PackagedTask(std::move(Task));
+ auto Future = PackagedTask.get_future();
+ {
+ // Lock the queue and push the new task
+ std::unique_lock<std::mutex> LockGuard(QueueLock);
+
+ // Don't allow enqueueing after disabling the pool
+ assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
+
+ Tasks.push(std::move(PackagedTask));
+ }
+ QueueCondition.notify_one();
+ return Future.share();
+}
+
// The destructor joins all threads, waiting for completion.
ThreadPool::~ThreadPool() {
{
@@ -111,6 +128,16 @@ void ThreadPool::wait() {
}
}
+std::shared_future<void> ThreadPool::asyncImpl(TaskTy Task) {
+ // Get a Future with launch::deferred execution using std::async
+ auto Future = std::async(std::launch::deferred, std::move(Task)).share();
+ // Wrap the future so that both ThreadPool::wait() can operate and the
+ // returned future can be sync'ed on.
+ PackagedTaskTy PackagedTask([Future]() { Future.get(); });
+ Tasks.push(std::move(PackagedTask));
+ return Future;
+}
+
ThreadPool::~ThreadPool() { wait(); }
#endif
diff --git a/llvm/unittests/Support/ThreadPool.cpp b/llvm/unittests/Support/ThreadPool.cpp
index b958b4e6dce83..a560d5069bff9 100644
--- a/llvm/unittests/Support/ThreadPool.cpp
+++ b/llvm/unittests/Support/ThreadPool.cpp
@@ -151,31 +151,6 @@ TEST_F(ThreadPoolTest, GetFuture) {
ASSERT_EQ(2, i.load());
}
-TEST_F(ThreadPoolTest, GetFutureWithResult) {
- CHECK_UNSUPPORTED();
- ThreadPool Pool(hardware_concurrency(2));
- auto F1 = Pool.async([] { return 1; });
- auto F2 = Pool.async([] { return 2; });
-
- setMainThreadReady();
- Pool.wait();
- ASSERT_EQ(1, F1.get());
- ASSERT_EQ(2, F2.get());
-}
-
-TEST_F(ThreadPoolTest, GetFutureWithResultAndArgs) {
- CHECK_UNSUPPORTED();
- ThreadPool Pool(hardware_concurrency(2));
- auto Fn = [](int x) { return x; };
- auto F1 = Pool.async(Fn, 1);
- auto F2 = Pool.async(Fn, 2);
-
- setMainThreadReady();
- Pool.wait();
- ASSERT_EQ(1, F1.get());
- ASSERT_EQ(2, F2.get());
-}
-
TEST_F(ThreadPoolTest, PoolDestruction) {
CHECK_UNSUPPORTED();
// Test that we are waiting on destruction
More information about the llvm-commits
mailing list