[lld] [llvm] [LLD] Avoid non-deterministic relocations processing. (PR #107186)
Alexey Lapshin via llvm-commits
llvm-commits at lists.llvm.org
Thu Sep 19 13:29:10 PDT 2024
https://github.com/avl-llvm updated https://github.com/llvm/llvm-project/pull/107186
>From b5208761f1e78b5e799280da00a2449ce326f443 Mon Sep 17 00:00:00 2001
From: Alexey Lapshin <avl.lapshin at gmail.com>
Date: Tue, 3 Sep 2024 07:39:12 +0300
Subject: [PATCH] [LLD] Avoid non-deterministic relocations processing.
TaskGroup has a "sequential" mode for deterministic processing.
LLD uses this for relocations handling. But, As it places relocations
to different vectors(based on getThreadIndex), the actual locations
can be non-deterministic:
if (shard)
part.relrDyn->relocsVec[parallel::getThreadIndex()].push_back(
{&isec, isec.relocs().size() - 1});
It is neccessary to use the same thread index to have the same locations.
This patch changes "sequential" mode to always execute tasks on single
thread with index 0.
Fixes this https://github.com/llvm/llvm-project/issues/105958.
---
lld/ELF/Driver.cpp | 2 +-
llvm/include/llvm/Support/Parallel.h | 5 +-
llvm/lib/Support/Parallel.cpp | 87 +++++++++++++++----------
llvm/unittests/Support/ParallelTest.cpp | 7 +-
4 files changed, 60 insertions(+), 41 deletions(-)
diff --git a/lld/ELF/Driver.cpp b/lld/ELF/Driver.cpp
index 37460a7a6c8eb4..13a19b6d4148d6 100644
--- a/lld/ELF/Driver.cpp
+++ b/lld/ELF/Driver.cpp
@@ -1690,7 +1690,7 @@ static void readConfigs(opt::InputArgList &args) {
}
if (auto *arg = args.getLastArg(OPT_thinlto_jobs_eq))
config->thinLTOJobs = arg->getValue();
- config->threadCount = parallel::strategy.compute_thread_count();
+ config->threadCount = parallel::getThreadCount();
if (config->ltoPartitions == 0)
error("--lto-partitions: number of threads must be > 0");
diff --git a/llvm/include/llvm/Support/Parallel.h b/llvm/include/llvm/Support/Parallel.h
index 8170da98f15a8c..0e1f4fadcf4fe6 100644
--- a/llvm/include/llvm/Support/Parallel.h
+++ b/llvm/include/llvm/Support/Parallel.h
@@ -96,9 +96,8 @@ class TaskGroup {
// Spawn a task, but does not wait for it to finish.
// Tasks marked with \p Sequential will be executed
- // exactly in the order which they were spawned.
- // Note: Sequential tasks may be executed on different
- // threads, but strictly in sequential order.
+ // exactly in the order which they were spawned and
+ // on the thread with index 0.
void spawn(std::function<void()> f, bool Sequential = false);
void sync() const { L.sync(); }
diff --git a/llvm/lib/Support/Parallel.cpp b/llvm/lib/Support/Parallel.cpp
index a3ef3d9c621b98..61f23645d6eb49 100644
--- a/llvm/lib/Support/Parallel.cpp
+++ b/llvm/lib/Support/Parallel.cpp
@@ -50,23 +50,32 @@ class Executor {
class ThreadPoolExecutor : public Executor {
public:
explicit ThreadPoolExecutor(ThreadPoolStrategy S) {
- ThreadCount = S.compute_thread_count();
+ ThreadCount = S.compute_thread_count() + 1;
+ Threads.reserve(ThreadCount);
+ Threads.resize(2);
+
+ {
+ std::lock_guard<std::mutex> Lock(MutexSequential);
+ auto &Thread0 = Threads[0];
+ Thread0 = std::thread([this, S]() {
+ work(S, 0, MutexSequential, CondSequential, WorkQueueSequential);
+ });
+ }
+
// Spawn all but one of the threads in another thread as spawning threads
// can take a while.
- Threads.reserve(ThreadCount);
- Threads.resize(1);
std::lock_guard<std::mutex> Lock(Mutex);
// Use operator[] before creating the thread to avoid data race in .size()
// in 'safe libc++' mode.
- auto &Thread0 = Threads[0];
- Thread0 = std::thread([this, S] {
- for (unsigned I = 1; I < ThreadCount; ++I) {
- Threads.emplace_back([=] { work(S, I); });
+ auto &Thread1 = Threads[1];
+ Thread1 = std::thread([this, S]() {
+ for (unsigned I = 2; I < ThreadCount; ++I) {
+ Threads.emplace_back([=] { work(S, I, Mutex, Cond, WorkQueue); });
if (Stop)
break;
}
ThreadsCreated.set_value();
- work(S, 0);
+ work(S, 1, Mutex, Cond, WorkQueue);
});
}
@@ -78,6 +87,7 @@ class ThreadPoolExecutor : public Executor {
Stop = true;
}
Cond.notify_all();
+ CondSequential.notify_all();
ThreadsCreated.get_future().wait();
}
@@ -99,57 +109,62 @@ class ThreadPoolExecutor : public Executor {
};
void add(std::function<void()> F, bool Sequential = false) override {
- {
- std::lock_guard<std::mutex> Lock(Mutex);
- if (Sequential)
- WorkQueueSequential.emplace_front(std::move(F));
- else
- WorkQueue.emplace_back(std::move(F));
+ if (Sequential) {
+ addImpl<true>(F, MutexSequential, CondSequential, WorkQueueSequential);
+ return;
}
- Cond.notify_one();
+
+ addImpl<false>(F, Mutex, Cond, WorkQueue);
}
size_t getThreadCount() const override { return ThreadCount; }
private:
- bool hasSequentialTasks() const {
- return !WorkQueueSequential.empty() && !SequentialQueueIsLocked;
+ template <bool Sequential>
+ void addImpl(std::function<void()> F, std::mutex &M,
+ std::condition_variable &C,
+ std::deque<std::function<void()>> &Q) {
+ {
+ std::lock_guard<std::mutex> Lock(M);
+ if constexpr (Sequential)
+ Q.emplace_front(std::move(F));
+ else
+ Q.emplace_back(std::move(F));
+ }
+ C.notify_one();
}
- bool hasGeneralTasks() const { return !WorkQueue.empty(); }
-
- void work(ThreadPoolStrategy S, unsigned ThreadID) {
+ void work(ThreadPoolStrategy S, unsigned ThreadID, std::mutex &M,
+ std::condition_variable &C, std::deque<std::function<void()>> &Q) {
threadIndex = ThreadID;
S.apply_thread_strategy(ThreadID);
while (true) {
- std::unique_lock<std::mutex> Lock(Mutex);
- Cond.wait(Lock, [&] {
- return Stop || hasGeneralTasks() || hasSequentialTasks();
- });
+ std::unique_lock<std::mutex> Lock(M);
+ C.wait(Lock, [&] { return Stop || !Q.empty(); });
+ // Stop if requested.
if (Stop)
break;
- bool Sequential = hasSequentialTasks();
- if (Sequential)
- SequentialQueueIsLocked = true;
- else
- assert(hasGeneralTasks());
- auto &Queue = Sequential ? WorkQueueSequential : WorkQueue;
- auto Task = std::move(Queue.back());
- Queue.pop_back();
+ if (Q.empty()) {
+ Lock.unlock();
+ continue;
+ }
+
+ // Unlock queue and execute task.
+ auto Task = std::move(Q.back());
+ Q.pop_back();
Lock.unlock();
Task();
- if (Sequential)
- SequentialQueueIsLocked = false;
}
}
std::atomic<bool> Stop{false};
- std::atomic<bool> SequentialQueueIsLocked{false};
std::deque<std::function<void()>> WorkQueue;
- std::deque<std::function<void()>> WorkQueueSequential;
std::mutex Mutex;
std::condition_variable Cond;
+ std::deque<std::function<void()>> WorkQueueSequential;
+ std::mutex MutexSequential;
+ std::condition_variable CondSequential;
std::promise<void> ThreadsCreated;
std::vector<std::thread> Threads;
unsigned ThreadCount;
diff --git a/llvm/unittests/Support/ParallelTest.cpp b/llvm/unittests/Support/ParallelTest.cpp
index 0eafb9b401bee7..c6acd302945651 100644
--- a/llvm/unittests/Support/ParallelTest.cpp
+++ b/llvm/unittests/Support/ParallelTest.cpp
@@ -99,7 +99,12 @@ TEST(Parallel, TaskGroupSequentialFor) {
{
parallel::TaskGroup tg;
for (size_t Idx = 0; Idx < 500; Idx++)
- tg.spawn([&Count, Idx]() { EXPECT_EQ(Count++, Idx); }, true);
+ tg.spawn(
+ [&Count, Idx]() {
+ EXPECT_EQ(Count++, Idx);
+ EXPECT_EQ(llvm::parallel::getThreadIndex(), 0u);
+ },
+ true);
}
EXPECT_EQ(Count, 500ul);
}
More information about the llvm-commits
mailing list