[llvm] fea8c07 - [Support][Parallel] Add sequential mode to TaskGroup::spawn().

Alexey Lapshin via llvm-commits llvm-commits at lists.llvm.org
Wed Apr 26 04:53:15 PDT 2023


Author: Alexey Lapshin
Date: 2023-04-26T13:52:26+02:00
New Revision: fea8c073561f21ac0fea7f961287bf6b7dcf9f96

URL: https://github.com/llvm/llvm-project/commit/fea8c073561f21ac0fea7f961287bf6b7dcf9f96
DIFF: https://github.com/llvm/llvm-project/commit/fea8c073561f21ac0fea7f961287bf6b7dcf9f96.diff

LOG: [Support][Parallel] Add sequential mode to TaskGroup::spawn().

This patch allows to specify that some part of tasks should be
done in sequential order. It makes it possible to not use
condition operator for separating sequential tasks:

TaskGroup tg;
for () {
  if(condition)      ==>   tg.spawn([](){fn();}, condition)
    fn();
  else
    tg.spawn([](){fn();});
}

It also prevents execution on main thread. Which allows adding
checks for getThreadIndex() function discussed in D142318.

The patch also replaces std::stack with std::deque in the
ThreadPoolExecutor to have natural execution order in case
(parallel::strategy.ThreadsRequested == 1).

Differential Revision: https://reviews.llvm.org/D148728

Added: 
    

Modified: 
    lld/ELF/OutputSections.cpp
    lld/ELF/Relocations.cpp
    llvm/include/llvm/Support/Parallel.h
    llvm/lib/Support/Parallel.cpp
    llvm/unittests/Support/ParallelTest.cpp

Removed: 
    


################################################################################
diff  --git a/lld/ELF/OutputSections.cpp b/lld/ELF/OutputSections.cpp
index 2251ecc83cde6..a1aec83f4ac85 100644
--- a/lld/ELF/OutputSections.cpp
+++ b/lld/ELF/OutputSections.cpp
@@ -534,7 +534,7 @@ void OutputSection::writeTo(uint8_t *buf, parallel::TaskGroup &tg) {
     taskSize += sections[i]->getSize();
     bool done = ++i == numSections;
     if (done || taskSize >= taskSizeLimit) {
-      tg.execute([=] { fn(begin, i); });
+      tg.spawn([=] { fn(begin, i); });
       if (done)
         break;
       begin = i;

diff  --git a/lld/ELF/Relocations.cpp b/lld/ELF/Relocations.cpp
index 6f2280b678b45..bda979c3066ac 100644
--- a/lld/ELF/Relocations.cpp
+++ b/lld/ELF/Relocations.cpp
@@ -1534,16 +1534,13 @@ template <class ELFT> void elf::scanRelocations() {
           scanner.template scanSection<ELFT>(*s);
       }
     };
-    if (serial)
-      fn();
-    else
-      tg.execute(fn);
+    tg.spawn(fn, serial);
   }
 
   // Both the main thread and thread pool index 0 use getThreadIndex()==0. Be
   // careful that they don't concurrently run scanSections. When serial is
   // true, fn() has finished at this point, so running execute is safe.
-  tg.execute([] {
+  tg.spawn([] {
     RelocationScanner scanner;
     for (Partition &part : partitions) {
       for (EhInputSection *sec : part.ehFrame->sections)

diff  --git a/llvm/include/llvm/Support/Parallel.h b/llvm/include/llvm/Support/Parallel.h
index 219197c4eb296..c9bcad69113d4 100644
--- a/llvm/include/llvm/Support/Parallel.h
+++ b/llvm/include/llvm/Support/Parallel.h
@@ -84,24 +84,11 @@ class TaskGroup {
   ~TaskGroup();
 
   // Spawn a task, but does not wait for it to finish.
-  void spawn(std::function<void()> f);
-
-  // Similar to spawn, but execute the task immediately when ThreadsRequested ==
-  // 1. The 
diff erence is to give the following pattern a more intuitive order
-  // when single threading is requested.
-  //
-  // for (size_t begin = 0, i = 0, taskSize = 0;;) {
-  //   taskSize += ...
-  //   bool done = ++i == end;
-  //   if (done || taskSize >= taskSizeLimit) {
-  //     tg.execute([=] { fn(begin, i); });
-  //     if (done)
-  //       break;
-  //     begin = i;
-  //     taskSize = 0;
-  //   }
-  // }
-  void execute(std::function<void()> f);
+  // Tasks marked with \p Sequential will be executed
+  // exactly in the order which they were spawned.
+  // Note: Sequential tasks may be executed on 
diff erent
+  // threads, but strictly in sequential order.
+  void spawn(std::function<void()> f, bool Sequential = false);
 
   void sync() const { L.sync(); }
 };

diff  --git a/llvm/lib/Support/Parallel.cpp b/llvm/lib/Support/Parallel.cpp
index 0a1d82efe768f..df292eba44713 100644
--- a/llvm/lib/Support/Parallel.cpp
+++ b/llvm/lib/Support/Parallel.cpp
@@ -12,8 +12,8 @@
 #include "llvm/Support/Threading.h"
 
 #include <atomic>
+#include <deque>
 #include <future>
-#include <stack>
 #include <thread>
 #include <vector>
 
@@ -39,7 +39,7 @@ namespace {
 class Executor {
 public:
   virtual ~Executor() = default;
-  virtual void add(std::function<void()> func) = 0;
+  virtual void add(std::function<void()> func, bool Sequential = false) = 0;
 
   static Executor *getDefaultExecutor();
 };
@@ -97,32 +97,56 @@ class ThreadPoolExecutor : public Executor {
     static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
   };
 
-  void add(std::function<void()> F) override {
+  void add(std::function<void()> F, bool Sequential = false) override {
     {
+      bool UseSequentialQueue =
+          Sequential || parallel::strategy.ThreadsRequested == 1;
       std::lock_guard<std::mutex> Lock(Mutex);
-      WorkStack.push(std::move(F));
+      if (UseSequentialQueue)
+        WorkQueueSequential.emplace_front(std::move(F));
+      else
+        WorkQueue.emplace_back(std::move(F));
     }
     Cond.notify_one();
   }
 
 private:
+  bool hasSequentialTasks() const {
+    return !WorkQueueSequential.empty() && !SequentialQueueIsLocked;
+  }
+
+  bool hasGeneralTasks() const { return !WorkQueue.empty(); }
+
   void work(ThreadPoolStrategy S, unsigned ThreadID) {
     threadIndex = ThreadID;
     S.apply_thread_strategy(ThreadID);
     while (true) {
       std::unique_lock<std::mutex> Lock(Mutex);
-      Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
+      Cond.wait(Lock, [&] {
+        return Stop || hasGeneralTasks() || hasSequentialTasks();
+      });
       if (Stop)
         break;
-      auto Task = std::move(WorkStack.top());
-      WorkStack.pop();
+      bool Sequential = hasSequentialTasks();
+      if (Sequential)
+        SequentialQueueIsLocked = true;
+      else
+        assert(hasGeneralTasks());
+
+      auto &Queue = Sequential ? WorkQueueSequential : WorkQueue;
+      auto Task = std::move(Queue.back());
+      Queue.pop_back();
       Lock.unlock();
       Task();
+      if (Sequential)
+        SequentialQueueIsLocked = false;
     }
   }
 
   std::atomic<bool> Stop{false};
-  std::stack<std::function<void()>> WorkStack;
+  std::atomic<bool> SequentialQueueIsLocked{false};
+  std::deque<std::function<void()>> WorkQueue;
+  std::deque<std::function<void()>> WorkQueueSequential;
   std::mutex Mutex;
   std::condition_variable Cond;
   std::promise<void> ThreadsCreated;
@@ -172,26 +196,22 @@ TaskGroup::~TaskGroup() {
   --TaskGroupInstances;
 }
 
-void TaskGroup::spawn(std::function<void()> F) {
+void TaskGroup::spawn(std::function<void()> F, bool Sequential) {
 #if LLVM_ENABLE_THREADS
   if (Parallel) {
     L.inc();
-    detail::Executor::getDefaultExecutor()->add([&, F = std::move(F)] {
-      F();
-      L.dec();
-    });
+    detail::Executor::getDefaultExecutor()->add(
+        [&, F = std::move(F)] {
+          F();
+          L.dec();
+        },
+        Sequential);
     return;
   }
 #endif
   F();
 }
 
-void TaskGroup::execute(std::function<void()> F) {
-  if (parallel::strategy.ThreadsRequested == 1)
-    F();
-  else
-    spawn(F);
-}
 } // namespace parallel
 } // namespace llvm
 

diff  --git a/llvm/unittests/Support/ParallelTest.cpp b/llvm/unittests/Support/ParallelTest.cpp
index bbf27dfab437e..2384a4d2b7411 100644
--- a/llvm/unittests/Support/ParallelTest.cpp
+++ b/llvm/unittests/Support/ParallelTest.cpp
@@ -92,4 +92,14 @@ TEST(Parallel, ForEachError) {
   EXPECT_EQ(errText, std::string("asdf\nasdf\nasdf"));
 }
 
+TEST(Parallel, TaskGroupSequentialFor) {
+  size_t Count = 0;
+  {
+    parallel::TaskGroup tg;
+    for (size_t Idx = 0; Idx < 500; Idx++)
+      tg.spawn([&Count, Idx]() { EXPECT_EQ(Count++, Idx); }, true);
+  }
+  EXPECT_EQ(Count, 500ul);
+}
+
 #endif


        


More information about the llvm-commits mailing list