[llvm] fea8c07 - [Support][Parallel] Add sequential mode to TaskGroup::spawn().
Alexey Lapshin via llvm-commits
llvm-commits at lists.llvm.org
Wed Apr 26 04:53:15 PDT 2023
Author: Alexey Lapshin
Date: 2023-04-26T13:52:26+02:00
New Revision: fea8c073561f21ac0fea7f961287bf6b7dcf9f96
URL: https://github.com/llvm/llvm-project/commit/fea8c073561f21ac0fea7f961287bf6b7dcf9f96
DIFF: https://github.com/llvm/llvm-project/commit/fea8c073561f21ac0fea7f961287bf6b7dcf9f96.diff
LOG: [Support][Parallel] Add sequential mode to TaskGroup::spawn().
This patch allows to specify that some part of tasks should be
done in sequential order. It makes it possible to not use
condition operator for separating sequential tasks:
TaskGroup tg;
for () {
if(condition) ==> tg.spawn([](){fn();}, condition)
fn();
else
tg.spawn([](){fn();});
}
It also prevents execution on main thread. Which allows adding
checks for getThreadIndex() function discussed in D142318.
The patch also replaces std::stack with std::deque in the
ThreadPoolExecutor to have natural execution order in case
(parallel::strategy.ThreadsRequested == 1).
Differential Revision: https://reviews.llvm.org/D148728
Added:
Modified:
lld/ELF/OutputSections.cpp
lld/ELF/Relocations.cpp
llvm/include/llvm/Support/Parallel.h
llvm/lib/Support/Parallel.cpp
llvm/unittests/Support/ParallelTest.cpp
Removed:
################################################################################
diff --git a/lld/ELF/OutputSections.cpp b/lld/ELF/OutputSections.cpp
index 2251ecc83cde6..a1aec83f4ac85 100644
--- a/lld/ELF/OutputSections.cpp
+++ b/lld/ELF/OutputSections.cpp
@@ -534,7 +534,7 @@ void OutputSection::writeTo(uint8_t *buf, parallel::TaskGroup &tg) {
taskSize += sections[i]->getSize();
bool done = ++i == numSections;
if (done || taskSize >= taskSizeLimit) {
- tg.execute([=] { fn(begin, i); });
+ tg.spawn([=] { fn(begin, i); });
if (done)
break;
begin = i;
diff --git a/lld/ELF/Relocations.cpp b/lld/ELF/Relocations.cpp
index 6f2280b678b45..bda979c3066ac 100644
--- a/lld/ELF/Relocations.cpp
+++ b/lld/ELF/Relocations.cpp
@@ -1534,16 +1534,13 @@ template <class ELFT> void elf::scanRelocations() {
scanner.template scanSection<ELFT>(*s);
}
};
- if (serial)
- fn();
- else
- tg.execute(fn);
+ tg.spawn(fn, serial);
}
// Both the main thread and thread pool index 0 use getThreadIndex()==0. Be
// careful that they don't concurrently run scanSections. When serial is
// true, fn() has finished at this point, so running execute is safe.
- tg.execute([] {
+ tg.spawn([] {
RelocationScanner scanner;
for (Partition &part : partitions) {
for (EhInputSection *sec : part.ehFrame->sections)
diff --git a/llvm/include/llvm/Support/Parallel.h b/llvm/include/llvm/Support/Parallel.h
index 219197c4eb296..c9bcad69113d4 100644
--- a/llvm/include/llvm/Support/Parallel.h
+++ b/llvm/include/llvm/Support/Parallel.h
@@ -84,24 +84,11 @@ class TaskGroup {
~TaskGroup();
// Spawn a task, but does not wait for it to finish.
- void spawn(std::function<void()> f);
-
- // Similar to spawn, but execute the task immediately when ThreadsRequested ==
- // 1. The
diff erence is to give the following pattern a more intuitive order
- // when single threading is requested.
- //
- // for (size_t begin = 0, i = 0, taskSize = 0;;) {
- // taskSize += ...
- // bool done = ++i == end;
- // if (done || taskSize >= taskSizeLimit) {
- // tg.execute([=] { fn(begin, i); });
- // if (done)
- // break;
- // begin = i;
- // taskSize = 0;
- // }
- // }
- void execute(std::function<void()> f);
+ // Tasks marked with \p Sequential will be executed
+ // exactly in the order which they were spawned.
+ // Note: Sequential tasks may be executed on
diff erent
+ // threads, but strictly in sequential order.
+ void spawn(std::function<void()> f, bool Sequential = false);
void sync() const { L.sync(); }
};
diff --git a/llvm/lib/Support/Parallel.cpp b/llvm/lib/Support/Parallel.cpp
index 0a1d82efe768f..df292eba44713 100644
--- a/llvm/lib/Support/Parallel.cpp
+++ b/llvm/lib/Support/Parallel.cpp
@@ -12,8 +12,8 @@
#include "llvm/Support/Threading.h"
#include <atomic>
+#include <deque>
#include <future>
-#include <stack>
#include <thread>
#include <vector>
@@ -39,7 +39,7 @@ namespace {
class Executor {
public:
virtual ~Executor() = default;
- virtual void add(std::function<void()> func) = 0;
+ virtual void add(std::function<void()> func, bool Sequential = false) = 0;
static Executor *getDefaultExecutor();
};
@@ -97,32 +97,56 @@ class ThreadPoolExecutor : public Executor {
static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
};
- void add(std::function<void()> F) override {
+ void add(std::function<void()> F, bool Sequential = false) override {
{
+ bool UseSequentialQueue =
+ Sequential || parallel::strategy.ThreadsRequested == 1;
std::lock_guard<std::mutex> Lock(Mutex);
- WorkStack.push(std::move(F));
+ if (UseSequentialQueue)
+ WorkQueueSequential.emplace_front(std::move(F));
+ else
+ WorkQueue.emplace_back(std::move(F));
}
Cond.notify_one();
}
private:
+ bool hasSequentialTasks() const {
+ return !WorkQueueSequential.empty() && !SequentialQueueIsLocked;
+ }
+
+ bool hasGeneralTasks() const { return !WorkQueue.empty(); }
+
void work(ThreadPoolStrategy S, unsigned ThreadID) {
threadIndex = ThreadID;
S.apply_thread_strategy(ThreadID);
while (true) {
std::unique_lock<std::mutex> Lock(Mutex);
- Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
+ Cond.wait(Lock, [&] {
+ return Stop || hasGeneralTasks() || hasSequentialTasks();
+ });
if (Stop)
break;
- auto Task = std::move(WorkStack.top());
- WorkStack.pop();
+ bool Sequential = hasSequentialTasks();
+ if (Sequential)
+ SequentialQueueIsLocked = true;
+ else
+ assert(hasGeneralTasks());
+
+ auto &Queue = Sequential ? WorkQueueSequential : WorkQueue;
+ auto Task = std::move(Queue.back());
+ Queue.pop_back();
Lock.unlock();
Task();
+ if (Sequential)
+ SequentialQueueIsLocked = false;
}
}
std::atomic<bool> Stop{false};
- std::stack<std::function<void()>> WorkStack;
+ std::atomic<bool> SequentialQueueIsLocked{false};
+ std::deque<std::function<void()>> WorkQueue;
+ std::deque<std::function<void()>> WorkQueueSequential;
std::mutex Mutex;
std::condition_variable Cond;
std::promise<void> ThreadsCreated;
@@ -172,26 +196,22 @@ TaskGroup::~TaskGroup() {
--TaskGroupInstances;
}
-void TaskGroup::spawn(std::function<void()> F) {
+void TaskGroup::spawn(std::function<void()> F, bool Sequential) {
#if LLVM_ENABLE_THREADS
if (Parallel) {
L.inc();
- detail::Executor::getDefaultExecutor()->add([&, F = std::move(F)] {
- F();
- L.dec();
- });
+ detail::Executor::getDefaultExecutor()->add(
+ [&, F = std::move(F)] {
+ F();
+ L.dec();
+ },
+ Sequential);
return;
}
#endif
F();
}
-void TaskGroup::execute(std::function<void()> F) {
- if (parallel::strategy.ThreadsRequested == 1)
- F();
- else
- spawn(F);
-}
} // namespace parallel
} // namespace llvm
diff --git a/llvm/unittests/Support/ParallelTest.cpp b/llvm/unittests/Support/ParallelTest.cpp
index bbf27dfab437e..2384a4d2b7411 100644
--- a/llvm/unittests/Support/ParallelTest.cpp
+++ b/llvm/unittests/Support/ParallelTest.cpp
@@ -92,4 +92,14 @@ TEST(Parallel, ForEachError) {
EXPECT_EQ(errText, std::string("asdf\nasdf\nasdf"));
}
+TEST(Parallel, TaskGroupSequentialFor) {
+ size_t Count = 0;
+ {
+ parallel::TaskGroup tg;
+ for (size_t Idx = 0; Idx < 500; Idx++)
+ tg.spawn([&Count, Idx]() { EXPECT_EQ(Count++, Idx); }, true);
+ }
+ EXPECT_EQ(Count, 500ul);
+}
+
#endif
More information about the llvm-commits
mailing list