[llvm] r335440 - Add a TaskQueue that can serialize work on a ThreadPool.
Zachary Turner via llvm-commits
llvm-commits at lists.llvm.org
Sun Jun 24 20:13:10 PDT 2018
Author: zturner
Date: Sun Jun 24 20:13:09 2018
New Revision: 335440
URL: http://llvm.org/viewvc/llvm-project?rev=335440&view=rev
Log:
Add a TaskQueue that can serialize work on a ThreadPool.
We have ThreadPool, which can execute work asynchronously on N
background threads, but sometimes you need to make sure the work
is executed asynchronously but also serially. That is, if task
B is enqueued after task A, then task B should not begin until
task A has completed. This patch adds such a class.
Differential Revision: https://reviews.llvm.org/D48240
Added:
llvm/trunk/include/llvm/Support/TaskQueue.h
llvm/trunk/unittests/Support/TaskQueueTest.cpp
Modified:
llvm/trunk/unittests/Support/CMakeLists.txt
Added: llvm/trunk/include/llvm/Support/TaskQueue.h
URL: http://llvm.org/viewvc/llvm-project/llvm/trunk/include/llvm/Support/TaskQueue.h?rev=335440&view=auto
==============================================================================
--- llvm/trunk/include/llvm/Support/TaskQueue.h (added)
+++ llvm/trunk/include/llvm/Support/TaskQueue.h Sun Jun 24 20:13:09 2018
@@ -0,0 +1,137 @@
+//===-- llvm/Support/TaskQueue.h - A TaskQueue implementation ---*- C++ -*-===//
+//
+// The LLVM Compiler Infrastructure
+//
+// This file is distributed under the University of Illinois Open Source
+// License. See LICENSE.TXT for details.
+//
+//===----------------------------------------------------------------------===//
+//
+// This file defines a crude C++11 based task queue.
+//
+//===----------------------------------------------------------------------===//
+
+#ifndef LLVM_SUPPORT_TASK_QUEUE_H
+#define LLVM_SUPPORT_TASK_QUEUE_H
+
+#include "llvm/Config/llvm-config.h"
+#include "llvm/Support/ThreadPool.h"
+#include "llvm/Support/thread.h"
+
+#include <atomic>
+#include <cassert>
+#include <condition_variable>
+#include <deque>
+#include <functional>
+#include <future>
+#include <memory>
+#include <mutex>
+#include <utility>
+
+namespace llvm {
+/// TaskQueue executes serialized work on a user-defined Thread Pool. It
+/// guarantees that if task B is enqueued after task A, task B begins after
+/// task A completes and there is no overlap between the two.
+class TaskQueue {
+ // Because we don't have init capture to use move-only local variables that
+ // are captured into a lambda, we create the promise inside an explicit
+ // callable struct. We want to do as much of the wrapping in the
+ // type-specialized domain (before type erasure) and then erase this into a
+ // std::function.
+ template <typename Callable> struct Task {
+ using ResultTy = typename std::result_of<Callable()>::type;
+ explicit Task(Callable C, TaskQueue &Parent)
+ : C(std::move(C)), P(std::make_shared<std::promise<ResultTy>>()),
+ Parent(&Parent) {}
+
+ template <typename T> void invokeCallbackAndSetPromise() {
+ P->set_value(C());
+ }
+
+ template <> void invokeCallbackAndSetPromise<void>() {
+ C();
+ P->set_value();
+ }
+
+ void operator()() noexcept {
+ invokeCallbackAndSetPromise<ResultTy>();
+ Parent->completeTask();
+ }
+
+ Callable C;
+ std::shared_ptr<std::promise<ResultTy>> P;
+ TaskQueue *Parent;
+ };
+
+public:
+ /// Construct a task queue with no work.
+ TaskQueue(ThreadPool &Scheduler) : Scheduler(Scheduler) { (void)Scheduler; }
+
+ /// Blocking destructor: the queue will wait for all work to complete.
+ ~TaskQueue() {
+ Scheduler.wait();
+ assert(Tasks.empty());
+ }
+
+ /// Asynchronous submission of a task to the queue. The returned future can be
+ /// used to wait for the task (and all previous tasks that have not yet
+ /// completed) to finish.
+ template <typename Callable>
+ std::future<typename std::result_of<Callable()>::type> async(Callable &&C) {
+#if !LLVM_ENABLE_THREADS
+ static_assert(false,
+ "TaskQueue requires building with LLVM_ENABLE_THREADS!");
+#endif
+ Task<Callable> T{std::move(C), *this};
+ using ResultTy = typename std::result_of<Callable()>::type;
+ std::future<ResultTy> F = T.P->get_future();
+ {
+ std::lock_guard<std::mutex> Lock(QueueLock);
+ // If there's already a task in flight, just queue this one up. If
+ // there is not a task in flight, bypass the queue and schedule this
+ // task immediately.
+ if (IsTaskInFlight)
+ Tasks.push_back(std::move(T));
+ else {
+ Scheduler.async(std::move(T));
+ IsTaskInFlight = true;
+ }
+ }
+ return std::move(F);
+ }
+
+private:
+ void completeTask() {
+ // We just completed a task. If there are no more tasks in the queue,
+ // update IsTaskInFlight to false and stop doing work. Otherwise
+ // schedule the next task (while not holding the lock).
+ std::function<void()> Continuation;
+ {
+ std::lock_guard<std::mutex> Lock(QueueLock);
+ if (Tasks.empty()) {
+ IsTaskInFlight = false;
+ return;
+ }
+
+ Continuation = std::move(Tasks.front());
+ Tasks.pop_front();
+ }
+ Scheduler.async(std::move(Continuation));
+ }
+
+ /// The thread pool on which to run the work.
+ ThreadPool &Scheduler;
+
+ /// State which indicates whether the queue currently is currently processing
+ /// any work.
+ bool IsTaskInFlight = false;
+
+ /// Mutex for synchronizing access to the Tasks array.
+ std::mutex QueueLock;
+
+ /// Tasks waiting for execution in the queue.
+ std::deque<std::function<void()>> Tasks;
+};
+} // namespace llvm
+
+#endif // LLVM_SUPPORT_TASK_QUEUE_H
Modified: llvm/trunk/unittests/Support/CMakeLists.txt
URL: http://llvm.org/viewvc/llvm-project/llvm/trunk/unittests/Support/CMakeLists.txt?rev=335440&r1=335439&r2=335440&view=diff
==============================================================================
--- llvm/trunk/unittests/Support/CMakeLists.txt (original)
+++ llvm/trunk/unittests/Support/CMakeLists.txt Sun Jun 24 20:13:09 2018
@@ -53,6 +53,7 @@ add_llvm_unittest(SupportTests
SwapByteOrderTest.cpp
TarWriterTest.cpp
TargetParserTest.cpp
+ TaskQueueTest.cpp
ThreadLocalTest.cpp
ThreadPool.cpp
Threading.cpp
Added: llvm/trunk/unittests/Support/TaskQueueTest.cpp
URL: http://llvm.org/viewvc/llvm-project/llvm/trunk/unittests/Support/TaskQueueTest.cpp?rev=335440&view=auto
==============================================================================
--- llvm/trunk/unittests/Support/TaskQueueTest.cpp (added)
+++ llvm/trunk/unittests/Support/TaskQueueTest.cpp Sun Jun 24 20:13:09 2018
@@ -0,0 +1,105 @@
+//========- unittests/Support/TaskQueue.cpp - TaskQueue.h tests ------========//
+//
+// The LLVM Compiler Infrastructure
+//
+// This file is distributed under the University of Illinois Open Source
+// License. See LICENSE.TXT for details.
+//
+//===----------------------------------------------------------------------===//
+
+#include "llvm/Support/TaskQueue.h"
+
+#include "gtest/gtest.h"
+
+using namespace llvm;
+
+#if LLVM_ENABLE_THREADS
+class TaskQueueTest : public testing::Test {
+protected:
+ TaskQueueTest() {}
+};
+
+TEST_F(TaskQueueTest, OrderedFutures) {
+ ThreadPool TP(1);
+ TaskQueue TQ(TP);
+ std::atomic<int> X = 0;
+ std::atomic<int> Y = 0;
+ std::atomic<int> Z = 0;
+
+ std::mutex M1, M2, M3;
+ std::unique_lock<std::mutex> L1(M1);
+ std::unique_lock<std::mutex> L2(M2);
+ std::unique_lock<std::mutex> L3(M3);
+
+ std::future<void> F1 = TQ.async([&] {
+ std::unique_lock<std::mutex> Lock(M1);
+ ++X;
+ });
+ std::future<void> F2 = TQ.async([&] {
+ std::unique_lock<std::mutex> Lock(M2);
+ ++Y;
+ });
+ std::future<void> F3 = TQ.async([&] {
+ std::unique_lock<std::mutex> Lock(M3);
+ ++Z;
+ });
+
+ L1.unlock();
+ F1.wait();
+ ASSERT_EQ(1, X);
+ ASSERT_EQ(0, Y);
+ ASSERT_EQ(0, Z);
+
+ L2.unlock();
+ F2.wait();
+ ASSERT_EQ(1, X);
+ ASSERT_EQ(1, Y);
+ ASSERT_EQ(0, Z);
+
+ L3.unlock();
+ F3.wait();
+ ASSERT_EQ(1, X);
+ ASSERT_EQ(1, Y);
+ ASSERT_EQ(1, Z);
+}
+
+TEST_F(TaskQueueTest, UnOrderedFutures) {
+ ThreadPool TP(1);
+ TaskQueue TQ(TP);
+ std::atomic<int> X = 0;
+ std::atomic<int> Y = 0;
+ std::atomic<int> Z = 0;
+ std::mutex M;
+
+ std::unique_lock<std::mutex> Lock(M);
+
+ std::future<void> F1 = TQ.async([&] { ++X; });
+ std::future<void> F2 = TQ.async([&] { ++Y; });
+ std::future<void> F3 = TQ.async([&M, &Z] {
+ std::unique_lock<std::mutex> Lock(M);
+ ++Z;
+ });
+
+ F2.wait();
+ ASSERT_EQ(1, X);
+ ASSERT_EQ(1, Y);
+ ASSERT_EQ(0, Z);
+
+ Lock.unlock();
+
+ F3.wait();
+ ASSERT_EQ(1, X);
+ ASSERT_EQ(1, Y);
+ ASSERT_EQ(1, Z);
+}
+
+TEST_F(TaskQueueTest, FutureWithReturnValue) {
+ ThreadPool TP(1);
+ TaskQueue TQ(TP);
+ std::future<std::string> F1 = TQ.async([&] { return std::string("Hello"); });
+ std::future<int> F2 = TQ.async([&] { return 42; });
+
+ ASSERT_EQ(42, F2.get());
+ ASSERT_EQ("Hello", F1.get());
+}
+#endif
More information about the llvm-commits
mailing list