[Openmp-commits] [openmp] [OpenMP][NFC] Refactor to prepare for SDMA engine patch (PR #69351)
Jan Patrick Lehr via Openmp-commits
openmp-commits at lists.llvm.org
Tue Oct 17 09:09:13 PDT 2023
https://github.com/jplehr created https://github.com/llvm/llvm-project/pull/69351
This is a refactoring in preparation of another patch that I'm working on.
Right now, we dispatch the memory copies via `hsa_amd_memory_async_copy`.
This selects the default SDMA engine for the data transfer, which may not be ideal.
With ROCm 5.7 a new API is introduced that allows to select the SDMA engine
used for a specific transfer. In order to make the functional changes more visible,
I'd like to first land this NFC.
>From 03f997af5fa395528248f6ac84f5383153d5825e Mon Sep 17 00:00:00 2001
From: JP Lehr <JanPatrick.Lehr at amd.com>
Date: Mon, 16 Oct 2023 05:36:39 -0400
Subject: [PATCH] [OpenMP][NFC] Refactor to prepare for SDMA patch
---
.../plugins-nextgen/amdgpu/src/rtl.cpp | 330 +++++++++---------
1 file changed, 171 insertions(+), 159 deletions(-)
diff --git a/openmp/libomptarget/plugins-nextgen/amdgpu/src/rtl.cpp b/openmp/libomptarget/plugins-nextgen/amdgpu/src/rtl.cpp
index 66a25e29d016276..8591f2cfbbb95e9 100644
--- a/openmp/libomptarget/plugins-nextgen/amdgpu/src/rtl.cpp
+++ b/openmp/libomptarget/plugins-nextgen/amdgpu/src/rtl.cpp
@@ -1153,34 +1153,7 @@ struct AMDGPUStreamTy {
/// Push an asynchronous memory copy between pinned memory buffers.
Error pushPinnedMemoryCopyAsync(void *Dst, const void *Src,
- uint64_t CopySize) {
- // Retrieve an available signal for the operation's output.
- AMDGPUSignalTy *OutputSignal = nullptr;
- if (auto Err = SignalManager.getResource(OutputSignal))
- return Err;
- OutputSignal->reset();
- OutputSignal->increaseUseCount();
-
- std::lock_guard<std::mutex> Lock(Mutex);
-
- // Consume stream slot and compute dependencies.
- auto [Curr, InputSignal] = consume(OutputSignal);
-
- // Avoid defining the input dependency if already satisfied.
- if (InputSignal && !InputSignal->load())
- InputSignal = nullptr;
-
- // Issue the async memory copy.
- hsa_status_t Status;
- if (InputSignal) {
- hsa_signal_t InputSignalRaw = InputSignal->get();
- Status = hsa_amd_memory_async_copy(Dst, Agent, Src, Agent, CopySize, 1,
- &InputSignalRaw, OutputSignal->get());
- } else
- Status = hsa_amd_memory_async_copy(Dst, Agent, Src, Agent, CopySize, 0,
- nullptr, OutputSignal->get());
- return Plugin::check(Status, "Error in hsa_amd_memory_async_copy: %s");
- }
+ uint64_t CopySize);
/// Push an asynchronous memory copy device-to-host involving an unpinned
/// memory buffer. The operation consists of a two-step copy from the
@@ -1190,65 +1163,7 @@ struct AMDGPUStreamTy {
/// manager once the operation completes.
Error pushMemoryCopyD2HAsync(void *Dst, const void *Src, void *Inter,
uint64_t CopySize,
- AMDGPUMemoryManagerTy &MemoryManager) {
- // Retrieve available signals for the operation's outputs.
- AMDGPUSignalTy *OutputSignals[2] = {};
- if (auto Err = SignalManager.getResources(/*Num=*/2, OutputSignals))
- return Err;
- for (auto Signal : OutputSignals) {
- Signal->reset();
- Signal->increaseUseCount();
- }
-
- std::lock_guard<std::mutex> Lock(Mutex);
-
- // Consume stream slot and compute dependencies.
- auto [Curr, InputSignal] = consume(OutputSignals[0]);
-
- // Avoid defining the input dependency if already satisfied.
- if (InputSignal && !InputSignal->load())
- InputSignal = nullptr;
-
- // Setup the post action for releasing the intermediate buffer.
- if (auto Err = Slots[Curr].schedReleaseBuffer(Inter, MemoryManager))
- return Err;
-
- // Issue the first step: device to host transfer. Avoid defining the input
- // dependency if already satisfied.
- hsa_status_t Status;
- if (InputSignal) {
- hsa_signal_t InputSignalRaw = InputSignal->get();
- Status =
- hsa_amd_memory_async_copy(Inter, Agent, Src, Agent, CopySize, 1,
- &InputSignalRaw, OutputSignals[0]->get());
- } else {
- Status = hsa_amd_memory_async_copy(Inter, Agent, Src, Agent, CopySize, 0,
- nullptr, OutputSignals[0]->get());
- }
-
- if (auto Err =
- Plugin::check(Status, "Error in hsa_amd_memory_async_copy: %s"))
- return Err;
-
- // Consume another stream slot and compute dependencies.
- std::tie(Curr, InputSignal) = consume(OutputSignals[1]);
- assert(InputSignal && "Invalid input signal");
-
- // The std::memcpy is done asynchronously using an async handler. We store
- // the function's information in the action but it's not actually an action.
- if (auto Err = Slots[Curr].schedHostMemoryCopy(Dst, Inter, CopySize))
- return Err;
-
- // Make changes on this slot visible to the async handler's thread.
- std::atomic_thread_fence(std::memory_order_release);
-
- // Issue the second step: host to host transfer.
- Status = hsa_amd_signal_async_handler(
- InputSignal->get(), HSA_SIGNAL_CONDITION_EQ, 0, asyncActionCallback,
- (void *)&Slots[Curr]);
-
- return Plugin::check(Status, "Error in hsa_amd_signal_async_handler: %s");
- }
+ AMDGPUMemoryManagerTy &MemoryManager);
/// Push an asynchronous memory copy host-to-device involving an unpinned
/// memory buffer. The operation consists of a two-step copy from the
@@ -1258,78 +1173,7 @@ struct AMDGPUStreamTy {
/// manager once the operation completes.
Error pushMemoryCopyH2DAsync(void *Dst, const void *Src, void *Inter,
uint64_t CopySize,
- AMDGPUMemoryManagerTy &MemoryManager) {
- // Retrieve available signals for the operation's outputs.
- AMDGPUSignalTy *OutputSignals[2] = {};
- if (auto Err = SignalManager.getResources(/*Num=*/2, OutputSignals))
- return Err;
- for (auto Signal : OutputSignals) {
- Signal->reset();
- Signal->increaseUseCount();
- }
-
- AMDGPUSignalTy *OutputSignal = OutputSignals[0];
-
- std::lock_guard<std::mutex> Lock(Mutex);
-
- // Consume stream slot and compute dependencies.
- auto [Curr, InputSignal] = consume(OutputSignal);
-
- // Avoid defining the input dependency if already satisfied.
- if (InputSignal && !InputSignal->load())
- InputSignal = nullptr;
-
- // Issue the first step: host to host transfer.
- if (InputSignal) {
- // The std::memcpy is done asynchronously using an async handler. We store
- // the function's information in the action but it is not actually a
- // post action.
- if (auto Err = Slots[Curr].schedHostMemoryCopy(Inter, Src, CopySize))
- return Err;
-
- // Make changes on this slot visible to the async handler's thread.
- std::atomic_thread_fence(std::memory_order_release);
-
- hsa_status_t Status = hsa_amd_signal_async_handler(
- InputSignal->get(), HSA_SIGNAL_CONDITION_EQ, 0, asyncActionCallback,
- (void *)&Slots[Curr]);
-
- if (auto Err = Plugin::check(Status,
- "Error in hsa_amd_signal_async_handler: %s"))
- return Err;
-
- // Let's use now the second output signal.
- OutputSignal = OutputSignals[1];
-
- // Consume another stream slot and compute dependencies.
- std::tie(Curr, InputSignal) = consume(OutputSignal);
- } else {
- // All preceding operations completed, copy the memory synchronously.
- std::memcpy(Inter, Src, CopySize);
-
- // Return the second signal because it will not be used.
- OutputSignals[1]->decreaseUseCount();
- if (auto Err = SignalManager.returnResource(OutputSignals[1]))
- return Err;
- }
-
- // Setup the post action to release the intermediate pinned buffer.
- if (auto Err = Slots[Curr].schedReleaseBuffer(Inter, MemoryManager))
- return Err;
-
- // Issue the second step: host to device transfer. Avoid defining the input
- // dependency if already satisfied.
- hsa_status_t Status;
- if (InputSignal && InputSignal->load()) {
- hsa_signal_t InputSignalRaw = InputSignal->get();
- Status = hsa_amd_memory_async_copy(Dst, Agent, Inter, Agent, CopySize, 1,
- &InputSignalRaw, OutputSignal->get());
- } else
- Status = hsa_amd_memory_async_copy(Dst, Agent, Inter, Agent, CopySize, 0,
- nullptr, OutputSignal->get());
-
- return Plugin::check(Status, "Error in hsa_amd_memory_async_copy: %s");
- }
+ AMDGPUMemoryManagerTy &MemoryManager);
/// Synchronize with the stream. The current thread waits until all operations
/// are finalized and it performs the pending post actions (i.e., releasing
@@ -3179,6 +3023,174 @@ void *AMDGPUDeviceTy::allocate(size_t Size, void *, TargetAllocTy Kind) {
return Alloc;
}
+Error AMDGPUStreamTy::pushPinnedMemoryCopyAsync(void *Dst, const void *Src,
+ uint64_t CopySize) {
+ // Retrieve an available signal for the operation's output.
+ AMDGPUSignalTy *OutputSignal = nullptr;
+ if (auto Err = SignalManager.getResource(OutputSignal))
+ return Err;
+ OutputSignal->reset();
+ OutputSignal->increaseUseCount();
+
+ std::lock_guard<std::mutex> Lock(Mutex);
+
+ // Consume stream slot and compute dependencies.
+ auto [Curr, InputSignal] = consume(OutputSignal);
+
+ // Avoid defining the input dependency if already satisfied.
+ if (InputSignal && !InputSignal->load())
+ InputSignal = nullptr;
+
+ // Issue the async memory copy.
+ hsa_status_t Status;
+ if (InputSignal) {
+ hsa_signal_t InputSignalRaw = InputSignal->get();
+ Status = hsa_amd_memory_async_copy(Dst, Agent, Src, Agent, CopySize, 1,
+ &InputSignalRaw, OutputSignal->get());
+ } else
+ Status = hsa_amd_memory_async_copy(Dst, Agent, Src, Agent, CopySize, 0,
+ nullptr, OutputSignal->get());
+
+ return Plugin::check(Status, "Error in hsa_amd_memory_async_copy: %s");
+}
+
+Error AMDGPUStreamTy::pushMemoryCopyD2HAsync(
+ void *Dst, const void *Src, void *Inter, uint64_t CopySize,
+ AMDGPUMemoryManagerTy &MemoryManager) {
+ // Retrieve available signals for the operation's outputs.
+ AMDGPUSignalTy *OutputSignals[2] = {};
+ if (auto Err = SignalManager.getResources(/*Num=*/2, OutputSignals))
+ return Err;
+ for (auto Signal : OutputSignals) {
+ Signal->reset();
+ Signal->increaseUseCount();
+ }
+
+ std::lock_guard<std::mutex> Lock(Mutex);
+
+ // Consume stream slot and compute dependencies.
+ auto [Curr, InputSignal] = consume(OutputSignals[0]);
+
+ // Avoid defining the input dependency if already satisfied.
+ if (InputSignal && !InputSignal->load())
+ InputSignal = nullptr;
+
+ // Setup the post action for releasing the intermediate buffer.
+ if (auto Err = Slots[Curr].schedReleaseBuffer(Inter, MemoryManager))
+ return Err;
+
+ // Issue the first step: device to host transfer. Avoid defining the input
+ // dependency if already satisfied.
+ hsa_status_t Status;
+ if (InputSignal) {
+ hsa_signal_t InputSignalRaw = InputSignal->get();
+ Status =
+ hsa_amd_memory_async_copy(Inter, Agent, Src, Agent, CopySize, 1,
+ &InputSignalRaw, OutputSignals[0]->get());
+ } else {
+ Status = hsa_amd_memory_async_copy(Inter, Agent, Src, Agent, CopySize, 0,
+ nullptr, OutputSignals[0]->get());
+ }
+
+ if (auto Err =
+ Plugin::check(Status, "Error in hsa_amd_memory_async_copy: %s"))
+ return Err;
+
+ // Consume another stream slot and compute dependencies.
+ std::tie(Curr, InputSignal) = consume(OutputSignals[1]);
+ assert(InputSignal && "Invalid input signal");
+
+ // The std::memcpy is done asynchronously using an async handler. We store
+ // the function's information in the action but it's not actually an action.
+ if (auto Err = Slots[Curr].schedHostMemoryCopy(Dst, Inter, CopySize))
+ return Err;
+
+ // Make changes on this slot visible to the async handler's thread.
+ std::atomic_thread_fence(std::memory_order_release);
+
+ // Issue the second step: host to host transfer.
+ Status = hsa_amd_signal_async_handler(
+ InputSignal->get(), HSA_SIGNAL_CONDITION_EQ, 0, asyncActionCallback,
+ (void *)&Slots[Curr]);
+
+ return Plugin::check(Status, "Error in hsa_amd_signal_async_handler: %s");
+}
+
+Error AMDGPUStreamTy::pushMemoryCopyH2DAsync(
+ void *Dst, const void *Src, void *Inter, uint64_t CopySize,
+ AMDGPUMemoryManagerTy &MemoryManager) {
+ // Retrieve available signals for the operation's outputs.
+ AMDGPUSignalTy *OutputSignals[2] = {};
+ if (auto Err = SignalManager.getResources(/*Num=*/2, OutputSignals))
+ return Err;
+ for (auto Signal : OutputSignals) {
+ Signal->reset();
+ Signal->increaseUseCount();
+ }
+
+ AMDGPUSignalTy *OutputSignal = OutputSignals[0];
+
+ std::lock_guard<std::mutex> Lock(Mutex);
+
+ // Consume stream slot and compute dependencies.
+ auto [Curr, InputSignal] = consume(OutputSignal);
+
+ // Avoid defining the input dependency if already satisfied.
+ if (InputSignal && !InputSignal->load())
+ InputSignal = nullptr;
+
+ // Issue the first step: host to host transfer.
+ if (InputSignal) {
+ // The std::memcpy is done asynchronously using an async handler. We store
+ // the function's information in the action but it is not actually a
+ // post action.
+ if (auto Err = Slots[Curr].schedHostMemoryCopy(Inter, Src, CopySize))
+ return Err;
+
+ // Make changes on this slot visible to the async handler's thread.
+ std::atomic_thread_fence(std::memory_order_release);
+
+ hsa_status_t Status = hsa_amd_signal_async_handler(
+ InputSignal->get(), HSA_SIGNAL_CONDITION_EQ, 0, asyncActionCallback,
+ (void *)&Slots[Curr]);
+
+ if (auto Err =
+ Plugin::check(Status, "Error in hsa_amd_signal_async_handler: %s"))
+ return Err;
+
+ // Let's use now the second output signal.
+ OutputSignal = OutputSignals[1];
+
+ // Consume another stream slot and compute dependencies.
+ std::tie(Curr, InputSignal) = consume(OutputSignal);
+ } else {
+ // All preceding operations completed, copy the memory synchronously.
+ std::memcpy(Inter, Src, CopySize);
+
+ // Return the second signal because it will not be used.
+ OutputSignals[1]->decreaseUseCount();
+ if (auto Err = SignalManager.returnResource(OutputSignals[1]))
+ return Err;
+ }
+
+ // Setup the post action to release the intermediate pinned buffer.
+ if (auto Err = Slots[Curr].schedReleaseBuffer(Inter, MemoryManager))
+ return Err;
+
+ // Issue the second step: host to device transfer. Avoid defining the input
+ // dependency if already satisfied.
+ hsa_status_t Status;
+ if (InputSignal && InputSignal->load()) {
+ hsa_signal_t InputSignalRaw = InputSignal->get();
+ Status = hsa_amd_memory_async_copy(Dst, Agent, Inter, Agent, CopySize, 1,
+ &InputSignalRaw, OutputSignal->get());
+ } else
+ Status = hsa_amd_memory_async_copy(Dst, Agent, Inter, Agent, CopySize, 0,
+ nullptr, OutputSignal->get());
+
+ return Plugin::check(Status, "Error in hsa_amd_memory_async_copy: %s");
+}
+
} // namespace plugin
} // namespace target
} // namespace omp
More information about the Openmp-commits
mailing list