[lld] [llvm] [LLD] Avoid non-deterministic relocations processing. (PR #107186)

Alexey Lapshin via llvm-commits llvm-commits at lists.llvm.org
Thu Sep 19 13:29:10 PDT 2024


https://github.com/avl-llvm updated https://github.com/llvm/llvm-project/pull/107186

>From b5208761f1e78b5e799280da00a2449ce326f443 Mon Sep 17 00:00:00 2001
From: Alexey Lapshin <avl.lapshin at gmail.com>
Date: Tue, 3 Sep 2024 07:39:12 +0300
Subject: [PATCH] [LLD] Avoid non-deterministic relocations processing.

TaskGroup has a "sequential" mode for deterministic processing.
LLD uses this for relocations handling. But, As it places relocations
to different vectors(based on getThreadIndex), the actual locations
can be non-deterministic:

    if (shard)
      part.relrDyn->relocsVec[parallel::getThreadIndex()].push_back(
          {&isec, isec.relocs().size() - 1});

It is neccessary to use the same thread index to have the same locations.
This patch changes "sequential" mode to always execute tasks on single
thread with index 0.

Fixes this https://github.com/llvm/llvm-project/issues/105958.
---
 lld/ELF/Driver.cpp                      |  2 +-
 llvm/include/llvm/Support/Parallel.h    |  5 +-
 llvm/lib/Support/Parallel.cpp           | 87 +++++++++++++++----------
 llvm/unittests/Support/ParallelTest.cpp |  7 +-
 4 files changed, 60 insertions(+), 41 deletions(-)

diff --git a/lld/ELF/Driver.cpp b/lld/ELF/Driver.cpp
index 37460a7a6c8eb4..13a19b6d4148d6 100644
--- a/lld/ELF/Driver.cpp
+++ b/lld/ELF/Driver.cpp
@@ -1690,7 +1690,7 @@ static void readConfigs(opt::InputArgList &args) {
   }
   if (auto *arg = args.getLastArg(OPT_thinlto_jobs_eq))
     config->thinLTOJobs = arg->getValue();
-  config->threadCount = parallel::strategy.compute_thread_count();
+  config->threadCount = parallel::getThreadCount();
 
   if (config->ltoPartitions == 0)
     error("--lto-partitions: number of threads must be > 0");
diff --git a/llvm/include/llvm/Support/Parallel.h b/llvm/include/llvm/Support/Parallel.h
index 8170da98f15a8c..0e1f4fadcf4fe6 100644
--- a/llvm/include/llvm/Support/Parallel.h
+++ b/llvm/include/llvm/Support/Parallel.h
@@ -96,9 +96,8 @@ class TaskGroup {
 
   // Spawn a task, but does not wait for it to finish.
   // Tasks marked with \p Sequential will be executed
-  // exactly in the order which they were spawned.
-  // Note: Sequential tasks may be executed on different
-  // threads, but strictly in sequential order.
+  // exactly in the order which they were spawned and
+  // on the thread with index 0.
   void spawn(std::function<void()> f, bool Sequential = false);
 
   void sync() const { L.sync(); }
diff --git a/llvm/lib/Support/Parallel.cpp b/llvm/lib/Support/Parallel.cpp
index a3ef3d9c621b98..61f23645d6eb49 100644
--- a/llvm/lib/Support/Parallel.cpp
+++ b/llvm/lib/Support/Parallel.cpp
@@ -50,23 +50,32 @@ class Executor {
 class ThreadPoolExecutor : public Executor {
 public:
   explicit ThreadPoolExecutor(ThreadPoolStrategy S) {
-    ThreadCount = S.compute_thread_count();
+    ThreadCount = S.compute_thread_count() + 1;
+    Threads.reserve(ThreadCount);
+    Threads.resize(2);
+
+    {
+      std::lock_guard<std::mutex> Lock(MutexSequential);
+      auto &Thread0 = Threads[0];
+      Thread0 = std::thread([this, S]() {
+        work(S, 0, MutexSequential, CondSequential, WorkQueueSequential);
+      });
+    }
+
     // Spawn all but one of the threads in another thread as spawning threads
     // can take a while.
-    Threads.reserve(ThreadCount);
-    Threads.resize(1);
     std::lock_guard<std::mutex> Lock(Mutex);
     // Use operator[] before creating the thread to avoid data race in .size()
     // in 'safe libc++' mode.
-    auto &Thread0 = Threads[0];
-    Thread0 = std::thread([this, S] {
-      for (unsigned I = 1; I < ThreadCount; ++I) {
-        Threads.emplace_back([=] { work(S, I); });
+    auto &Thread1 = Threads[1];
+    Thread1 = std::thread([this, S]() {
+      for (unsigned I = 2; I < ThreadCount; ++I) {
+        Threads.emplace_back([=] { work(S, I, Mutex, Cond, WorkQueue); });
         if (Stop)
           break;
       }
       ThreadsCreated.set_value();
-      work(S, 0);
+      work(S, 1, Mutex, Cond, WorkQueue);
     });
   }
 
@@ -78,6 +87,7 @@ class ThreadPoolExecutor : public Executor {
       Stop = true;
     }
     Cond.notify_all();
+    CondSequential.notify_all();
     ThreadsCreated.get_future().wait();
   }
 
@@ -99,57 +109,62 @@ class ThreadPoolExecutor : public Executor {
   };
 
   void add(std::function<void()> F, bool Sequential = false) override {
-    {
-      std::lock_guard<std::mutex> Lock(Mutex);
-      if (Sequential)
-        WorkQueueSequential.emplace_front(std::move(F));
-      else
-        WorkQueue.emplace_back(std::move(F));
+    if (Sequential) {
+      addImpl<true>(F, MutexSequential, CondSequential, WorkQueueSequential);
+      return;
     }
-    Cond.notify_one();
+
+    addImpl<false>(F, Mutex, Cond, WorkQueue);
   }
 
   size_t getThreadCount() const override { return ThreadCount; }
 
 private:
-  bool hasSequentialTasks() const {
-    return !WorkQueueSequential.empty() && !SequentialQueueIsLocked;
+  template <bool Sequential>
+  void addImpl(std::function<void()> F, std::mutex &M,
+               std::condition_variable &C,
+               std::deque<std::function<void()>> &Q) {
+    {
+      std::lock_guard<std::mutex> Lock(M);
+      if constexpr (Sequential)
+        Q.emplace_front(std::move(F));
+      else
+        Q.emplace_back(std::move(F));
+    }
+    C.notify_one();
   }
 
-  bool hasGeneralTasks() const { return !WorkQueue.empty(); }
-
-  void work(ThreadPoolStrategy S, unsigned ThreadID) {
+  void work(ThreadPoolStrategy S, unsigned ThreadID, std::mutex &M,
+            std::condition_variable &C, std::deque<std::function<void()>> &Q) {
     threadIndex = ThreadID;
     S.apply_thread_strategy(ThreadID);
     while (true) {
-      std::unique_lock<std::mutex> Lock(Mutex);
-      Cond.wait(Lock, [&] {
-        return Stop || hasGeneralTasks() || hasSequentialTasks();
-      });
+      std::unique_lock<std::mutex> Lock(M);
+      C.wait(Lock, [&] { return Stop || !Q.empty(); });
+      // Stop if requested.
       if (Stop)
         break;
-      bool Sequential = hasSequentialTasks();
-      if (Sequential)
-        SequentialQueueIsLocked = true;
-      else
-        assert(hasGeneralTasks());
 
-      auto &Queue = Sequential ? WorkQueueSequential : WorkQueue;
-      auto Task = std::move(Queue.back());
-      Queue.pop_back();
+      if (Q.empty()) {
+        Lock.unlock();
+        continue;
+      }
+
+      // Unlock queue and execute task.
+      auto Task = std::move(Q.back());
+      Q.pop_back();
       Lock.unlock();
       Task();
-      if (Sequential)
-        SequentialQueueIsLocked = false;
     }
   }
 
   std::atomic<bool> Stop{false};
-  std::atomic<bool> SequentialQueueIsLocked{false};
   std::deque<std::function<void()>> WorkQueue;
-  std::deque<std::function<void()>> WorkQueueSequential;
   std::mutex Mutex;
   std::condition_variable Cond;
+  std::deque<std::function<void()>> WorkQueueSequential;
+  std::mutex MutexSequential;
+  std::condition_variable CondSequential;
   std::promise<void> ThreadsCreated;
   std::vector<std::thread> Threads;
   unsigned ThreadCount;
diff --git a/llvm/unittests/Support/ParallelTest.cpp b/llvm/unittests/Support/ParallelTest.cpp
index 0eafb9b401bee7..c6acd302945651 100644
--- a/llvm/unittests/Support/ParallelTest.cpp
+++ b/llvm/unittests/Support/ParallelTest.cpp
@@ -99,7 +99,12 @@ TEST(Parallel, TaskGroupSequentialFor) {
   {
     parallel::TaskGroup tg;
     for (size_t Idx = 0; Idx < 500; Idx++)
-      tg.spawn([&Count, Idx]() { EXPECT_EQ(Count++, Idx); }, true);
+      tg.spawn(
+          [&Count, Idx]() {
+            EXPECT_EQ(Count++, Idx);
+            EXPECT_EQ(llvm::parallel::getThreadIndex(), 0u);
+          },
+          true);
   }
   EXPECT_EQ(Count, 500ul);
 }



More information about the llvm-commits mailing list