[llvm] r335440 - Add a TaskQueue that can serialize work on a ThreadPool.

Zachary Turner via llvm-commits llvm-commits at lists.llvm.org
Sun Jun 24 20:13:10 PDT 2018


Author: zturner
Date: Sun Jun 24 20:13:09 2018
New Revision: 335440

URL: http://llvm.org/viewvc/llvm-project?rev=335440&view=rev
Log:
Add a TaskQueue that can serialize work on a ThreadPool.

We have ThreadPool, which can execute work asynchronously on N
background threads, but sometimes you need to make sure the work
is executed asynchronously but also serially.  That is, if task
B is enqueued after task A, then task B should not begin until
task A has completed.  This patch adds such a class.

Differential Revision: https://reviews.llvm.org/D48240

Added:
    llvm/trunk/include/llvm/Support/TaskQueue.h
    llvm/trunk/unittests/Support/TaskQueueTest.cpp
Modified:
    llvm/trunk/unittests/Support/CMakeLists.txt

Added: llvm/trunk/include/llvm/Support/TaskQueue.h
URL: http://llvm.org/viewvc/llvm-project/llvm/trunk/include/llvm/Support/TaskQueue.h?rev=335440&view=auto
==============================================================================
--- llvm/trunk/include/llvm/Support/TaskQueue.h (added)
+++ llvm/trunk/include/llvm/Support/TaskQueue.h Sun Jun 24 20:13:09 2018
@@ -0,0 +1,137 @@
+//===-- llvm/Support/TaskQueue.h - A TaskQueue implementation ---*- C++ -*-===//
+//
+//                     The LLVM Compiler Infrastructure
+//
+// This file is distributed under the University of Illinois Open Source
+// License. See LICENSE.TXT for details.
+//
+//===----------------------------------------------------------------------===//
+//
+// This file defines a crude C++11 based task queue.
+//
+//===----------------------------------------------------------------------===//
+
+#ifndef LLVM_SUPPORT_TASK_QUEUE_H
+#define LLVM_SUPPORT_TASK_QUEUE_H
+
+#include "llvm/Config/llvm-config.h"
+#include "llvm/Support/ThreadPool.h"
+#include "llvm/Support/thread.h"
+
+#include <atomic>
+#include <cassert>
+#include <condition_variable>
+#include <deque>
+#include <functional>
+#include <future>
+#include <memory>
+#include <mutex>
+#include <utility>
+
+namespace llvm {
+/// TaskQueue executes serialized work on a user-defined Thread Pool.  It
+/// guarantees that if task B is enqueued after task A, task B begins after
+/// task A completes and there is no overlap between the two.
+class TaskQueue {
+  // Because we don't have init capture to use move-only local variables that
+  // are captured into a lambda, we create the promise inside an explicit
+  // callable struct. We want to do as much of the wrapping in the
+  // type-specialized domain (before type erasure) and then erase this into a
+  // std::function.
+  template <typename Callable> struct Task {
+    using ResultTy = typename std::result_of<Callable()>::type;
+    explicit Task(Callable C, TaskQueue &Parent)
+        : C(std::move(C)), P(std::make_shared<std::promise<ResultTy>>()),
+          Parent(&Parent) {}
+
+    template <typename T> void invokeCallbackAndSetPromise() {
+      P->set_value(C());
+    }
+
+    template <> void invokeCallbackAndSetPromise<void>() {
+      C();
+      P->set_value();
+    }
+
+    void operator()() noexcept {
+      invokeCallbackAndSetPromise<ResultTy>();
+      Parent->completeTask();
+    }
+
+    Callable C;
+    std::shared_ptr<std::promise<ResultTy>> P;
+    TaskQueue *Parent;
+  };
+
+public:
+  /// Construct a task queue with no work.
+  TaskQueue(ThreadPool &Scheduler) : Scheduler(Scheduler) { (void)Scheduler; }
+
+  /// Blocking destructor: the queue will wait for all work to complete.
+  ~TaskQueue() {
+    Scheduler.wait();
+    assert(Tasks.empty());
+  }
+
+  /// Asynchronous submission of a task to the queue. The returned future can be
+  /// used to wait for the task (and all previous tasks that have not yet
+  /// completed) to finish.
+  template <typename Callable>
+  std::future<typename std::result_of<Callable()>::type> async(Callable &&C) {
+#if !LLVM_ENABLE_THREADS
+    static_assert(false,
+                  "TaskQueue requires building with LLVM_ENABLE_THREADS!");
+#endif
+    Task<Callable> T{std::move(C), *this};
+    using ResultTy = typename std::result_of<Callable()>::type;
+    std::future<ResultTy> F = T.P->get_future();
+    {
+      std::lock_guard<std::mutex> Lock(QueueLock);
+      // If there's already a task in flight, just queue this one up.  If
+      // there is not a task in flight, bypass the queue and schedule this
+      // task immediately.
+      if (IsTaskInFlight)
+        Tasks.push_back(std::move(T));
+      else {
+        Scheduler.async(std::move(T));
+        IsTaskInFlight = true;
+      }
+    }
+    return std::move(F);
+  }
+
+private:
+  void completeTask() {
+    // We just completed a task.  If there are no more tasks in the queue,
+    // update IsTaskInFlight to false and stop doing work.  Otherwise
+    // schedule the next task (while not holding the lock).
+    std::function<void()> Continuation;
+    {
+      std::lock_guard<std::mutex> Lock(QueueLock);
+      if (Tasks.empty()) {
+        IsTaskInFlight = false;
+        return;
+      }
+
+      Continuation = std::move(Tasks.front());
+      Tasks.pop_front();
+    }
+    Scheduler.async(std::move(Continuation));
+  }
+
+  /// The thread pool on which to run the work.
+  ThreadPool &Scheduler;
+
+  /// State which indicates whether the queue currently is currently processing
+  /// any work.
+  bool IsTaskInFlight = false;
+
+  /// Mutex for synchronizing access to the Tasks array.
+  std::mutex QueueLock;
+
+  /// Tasks waiting for execution in the queue.
+  std::deque<std::function<void()>> Tasks;
+};
+} // namespace llvm
+
+#endif // LLVM_SUPPORT_TASK_QUEUE_H

Modified: llvm/trunk/unittests/Support/CMakeLists.txt
URL: http://llvm.org/viewvc/llvm-project/llvm/trunk/unittests/Support/CMakeLists.txt?rev=335440&r1=335439&r2=335440&view=diff
==============================================================================
--- llvm/trunk/unittests/Support/CMakeLists.txt (original)
+++ llvm/trunk/unittests/Support/CMakeLists.txt Sun Jun 24 20:13:09 2018
@@ -53,6 +53,7 @@ add_llvm_unittest(SupportTests
   SwapByteOrderTest.cpp
   TarWriterTest.cpp
   TargetParserTest.cpp
+  TaskQueueTest.cpp
   ThreadLocalTest.cpp
   ThreadPool.cpp
   Threading.cpp

Added: llvm/trunk/unittests/Support/TaskQueueTest.cpp
URL: http://llvm.org/viewvc/llvm-project/llvm/trunk/unittests/Support/TaskQueueTest.cpp?rev=335440&view=auto
==============================================================================
--- llvm/trunk/unittests/Support/TaskQueueTest.cpp (added)
+++ llvm/trunk/unittests/Support/TaskQueueTest.cpp Sun Jun 24 20:13:09 2018
@@ -0,0 +1,105 @@
+//========- unittests/Support/TaskQueue.cpp - TaskQueue.h tests ------========//
+//
+//                     The LLVM Compiler Infrastructure
+//
+// This file is distributed under the University of Illinois Open Source
+// License. See LICENSE.TXT for details.
+//
+//===----------------------------------------------------------------------===//
+
+#include "llvm/Support/TaskQueue.h"
+
+#include "gtest/gtest.h"
+
+using namespace llvm;
+
+#if LLVM_ENABLE_THREADS
+class TaskQueueTest : public testing::Test {
+protected:
+  TaskQueueTest() {}
+};
+
+TEST_F(TaskQueueTest, OrderedFutures) {
+  ThreadPool TP(1);
+  TaskQueue TQ(TP);
+  std::atomic<int> X = 0;
+  std::atomic<int> Y = 0;
+  std::atomic<int> Z = 0;
+
+  std::mutex M1, M2, M3;
+  std::unique_lock<std::mutex> L1(M1);
+  std::unique_lock<std::mutex> L2(M2);
+  std::unique_lock<std::mutex> L3(M3);
+
+  std::future<void> F1 = TQ.async([&] {
+    std::unique_lock<std::mutex> Lock(M1);
+    ++X;
+  });
+  std::future<void> F2 = TQ.async([&] {
+    std::unique_lock<std::mutex> Lock(M2);
+    ++Y;
+  });
+  std::future<void> F3 = TQ.async([&] {
+    std::unique_lock<std::mutex> Lock(M3);
+    ++Z;
+  });
+
+  L1.unlock();
+  F1.wait();
+  ASSERT_EQ(1, X);
+  ASSERT_EQ(0, Y);
+  ASSERT_EQ(0, Z);
+
+  L2.unlock();
+  F2.wait();
+  ASSERT_EQ(1, X);
+  ASSERT_EQ(1, Y);
+  ASSERT_EQ(0, Z);
+
+  L3.unlock();
+  F3.wait();
+  ASSERT_EQ(1, X);
+  ASSERT_EQ(1, Y);
+  ASSERT_EQ(1, Z);
+}
+
+TEST_F(TaskQueueTest, UnOrderedFutures) {
+  ThreadPool TP(1);
+  TaskQueue TQ(TP);
+  std::atomic<int> X = 0;
+  std::atomic<int> Y = 0;
+  std::atomic<int> Z = 0;
+  std::mutex M;
+
+  std::unique_lock<std::mutex> Lock(M);
+
+  std::future<void> F1 = TQ.async([&] { ++X; });
+  std::future<void> F2 = TQ.async([&] { ++Y; });
+  std::future<void> F3 = TQ.async([&M, &Z] {
+    std::unique_lock<std::mutex> Lock(M);
+    ++Z;
+  });
+
+  F2.wait();
+  ASSERT_EQ(1, X);
+  ASSERT_EQ(1, Y);
+  ASSERT_EQ(0, Z);
+
+  Lock.unlock();
+
+  F3.wait();
+  ASSERT_EQ(1, X);
+  ASSERT_EQ(1, Y);
+  ASSERT_EQ(1, Z);
+}
+
+TEST_F(TaskQueueTest, FutureWithReturnValue) {
+  ThreadPool TP(1);
+  TaskQueue TQ(TP);
+  std::future<std::string> F1 = TQ.async([&] { return std::string("Hello"); });
+  std::future<int> F2 = TQ.async([&] { return 42; });
+
+  ASSERT_EQ(42, F2.get());
+  ASSERT_EQ("Hello", F1.get());
+}
+#endif




More information about the llvm-commits mailing list