[Openmp-commits] [openmp] [OpenMP][NFC] Refactor to prepare for SDMA engine patch (PR #69351)

Jan Patrick Lehr via Openmp-commits openmp-commits at lists.llvm.org
Tue Oct 17 09:09:13 PDT 2023


https://github.com/jplehr created https://github.com/llvm/llvm-project/pull/69351

This is a refactoring in preparation of another patch that I'm working on.

Right now, we dispatch the memory copies via `hsa_amd_memory_async_copy`.
This selects the default SDMA engine for the data transfer, which may not be ideal.
With ROCm 5.7 a new API is introduced that allows to select the SDMA engine
used for a specific transfer. In order to make the functional changes more visible,
I'd like to first land this NFC.

>From 03f997af5fa395528248f6ac84f5383153d5825e Mon Sep 17 00:00:00 2001
From: JP Lehr <JanPatrick.Lehr at amd.com>
Date: Mon, 16 Oct 2023 05:36:39 -0400
Subject: [PATCH] [OpenMP][NFC] Refactor to prepare for SDMA patch

---
 .../plugins-nextgen/amdgpu/src/rtl.cpp        | 330 +++++++++---------
 1 file changed, 171 insertions(+), 159 deletions(-)

diff --git a/openmp/libomptarget/plugins-nextgen/amdgpu/src/rtl.cpp b/openmp/libomptarget/plugins-nextgen/amdgpu/src/rtl.cpp
index 66a25e29d016276..8591f2cfbbb95e9 100644
--- a/openmp/libomptarget/plugins-nextgen/amdgpu/src/rtl.cpp
+++ b/openmp/libomptarget/plugins-nextgen/amdgpu/src/rtl.cpp
@@ -1153,34 +1153,7 @@ struct AMDGPUStreamTy {
 
   /// Push an asynchronous memory copy between pinned memory buffers.
   Error pushPinnedMemoryCopyAsync(void *Dst, const void *Src,
-                                  uint64_t CopySize) {
-    // Retrieve an available signal for the operation's output.
-    AMDGPUSignalTy *OutputSignal = nullptr;
-    if (auto Err = SignalManager.getResource(OutputSignal))
-      return Err;
-    OutputSignal->reset();
-    OutputSignal->increaseUseCount();
-
-    std::lock_guard<std::mutex> Lock(Mutex);
-
-    // Consume stream slot and compute dependencies.
-    auto [Curr, InputSignal] = consume(OutputSignal);
-
-    // Avoid defining the input dependency if already satisfied.
-    if (InputSignal && !InputSignal->load())
-      InputSignal = nullptr;
-
-    // Issue the async memory copy.
-    hsa_status_t Status;
-    if (InputSignal) {
-      hsa_signal_t InputSignalRaw = InputSignal->get();
-      Status = hsa_amd_memory_async_copy(Dst, Agent, Src, Agent, CopySize, 1,
-                                         &InputSignalRaw, OutputSignal->get());
-    } else
-      Status = hsa_amd_memory_async_copy(Dst, Agent, Src, Agent, CopySize, 0,
-                                         nullptr, OutputSignal->get());
-    return Plugin::check(Status, "Error in hsa_amd_memory_async_copy: %s");
-  }
+                                  uint64_t CopySize);
 
   /// Push an asynchronous memory copy device-to-host involving an unpinned
   /// memory buffer. The operation consists of a two-step copy from the
@@ -1190,65 +1163,7 @@ struct AMDGPUStreamTy {
   /// manager once the operation completes.
   Error pushMemoryCopyD2HAsync(void *Dst, const void *Src, void *Inter,
                                uint64_t CopySize,
-                               AMDGPUMemoryManagerTy &MemoryManager) {
-    // Retrieve available signals for the operation's outputs.
-    AMDGPUSignalTy *OutputSignals[2] = {};
-    if (auto Err = SignalManager.getResources(/*Num=*/2, OutputSignals))
-      return Err;
-    for (auto Signal : OutputSignals) {
-      Signal->reset();
-      Signal->increaseUseCount();
-    }
-
-    std::lock_guard<std::mutex> Lock(Mutex);
-
-    // Consume stream slot and compute dependencies.
-    auto [Curr, InputSignal] = consume(OutputSignals[0]);
-
-    // Avoid defining the input dependency if already satisfied.
-    if (InputSignal && !InputSignal->load())
-      InputSignal = nullptr;
-
-    // Setup the post action for releasing the intermediate buffer.
-    if (auto Err = Slots[Curr].schedReleaseBuffer(Inter, MemoryManager))
-      return Err;
-
-    // Issue the first step: device to host transfer. Avoid defining the input
-    // dependency if already satisfied.
-    hsa_status_t Status;
-    if (InputSignal) {
-      hsa_signal_t InputSignalRaw = InputSignal->get();
-      Status =
-          hsa_amd_memory_async_copy(Inter, Agent, Src, Agent, CopySize, 1,
-                                    &InputSignalRaw, OutputSignals[0]->get());
-    } else {
-      Status = hsa_amd_memory_async_copy(Inter, Agent, Src, Agent, CopySize, 0,
-                                         nullptr, OutputSignals[0]->get());
-    }
-
-    if (auto Err =
-            Plugin::check(Status, "Error in hsa_amd_memory_async_copy: %s"))
-      return Err;
-
-    // Consume another stream slot and compute dependencies.
-    std::tie(Curr, InputSignal) = consume(OutputSignals[1]);
-    assert(InputSignal && "Invalid input signal");
-
-    // The std::memcpy is done asynchronously using an async handler. We store
-    // the function's information in the action but it's not actually an action.
-    if (auto Err = Slots[Curr].schedHostMemoryCopy(Dst, Inter, CopySize))
-      return Err;
-
-    // Make changes on this slot visible to the async handler's thread.
-    std::atomic_thread_fence(std::memory_order_release);
-
-    // Issue the second step: host to host transfer.
-    Status = hsa_amd_signal_async_handler(
-        InputSignal->get(), HSA_SIGNAL_CONDITION_EQ, 0, asyncActionCallback,
-        (void *)&Slots[Curr]);
-
-    return Plugin::check(Status, "Error in hsa_amd_signal_async_handler: %s");
-  }
+                               AMDGPUMemoryManagerTy &MemoryManager);
 
   /// Push an asynchronous memory copy host-to-device involving an unpinned
   /// memory buffer. The operation consists of a two-step copy from the
@@ -1258,78 +1173,7 @@ struct AMDGPUStreamTy {
   /// manager once the operation completes.
   Error pushMemoryCopyH2DAsync(void *Dst, const void *Src, void *Inter,
                                uint64_t CopySize,
-                               AMDGPUMemoryManagerTy &MemoryManager) {
-    // Retrieve available signals for the operation's outputs.
-    AMDGPUSignalTy *OutputSignals[2] = {};
-    if (auto Err = SignalManager.getResources(/*Num=*/2, OutputSignals))
-      return Err;
-    for (auto Signal : OutputSignals) {
-      Signal->reset();
-      Signal->increaseUseCount();
-    }
-
-    AMDGPUSignalTy *OutputSignal = OutputSignals[0];
-
-    std::lock_guard<std::mutex> Lock(Mutex);
-
-    // Consume stream slot and compute dependencies.
-    auto [Curr, InputSignal] = consume(OutputSignal);
-
-    // Avoid defining the input dependency if already satisfied.
-    if (InputSignal && !InputSignal->load())
-      InputSignal = nullptr;
-
-    // Issue the first step: host to host transfer.
-    if (InputSignal) {
-      // The std::memcpy is done asynchronously using an async handler. We store
-      // the function's information in the action but it is not actually a
-      // post action.
-      if (auto Err = Slots[Curr].schedHostMemoryCopy(Inter, Src, CopySize))
-        return Err;
-
-      // Make changes on this slot visible to the async handler's thread.
-      std::atomic_thread_fence(std::memory_order_release);
-
-      hsa_status_t Status = hsa_amd_signal_async_handler(
-          InputSignal->get(), HSA_SIGNAL_CONDITION_EQ, 0, asyncActionCallback,
-          (void *)&Slots[Curr]);
-
-      if (auto Err = Plugin::check(Status,
-                                   "Error in hsa_amd_signal_async_handler: %s"))
-        return Err;
-
-      // Let's use now the second output signal.
-      OutputSignal = OutputSignals[1];
-
-      // Consume another stream slot and compute dependencies.
-      std::tie(Curr, InputSignal) = consume(OutputSignal);
-    } else {
-      // All preceding operations completed, copy the memory synchronously.
-      std::memcpy(Inter, Src, CopySize);
-
-      // Return the second signal because it will not be used.
-      OutputSignals[1]->decreaseUseCount();
-      if (auto Err = SignalManager.returnResource(OutputSignals[1]))
-        return Err;
-    }
-
-    // Setup the post action to release the intermediate pinned buffer.
-    if (auto Err = Slots[Curr].schedReleaseBuffer(Inter, MemoryManager))
-      return Err;
-
-    // Issue the second step: host to device transfer. Avoid defining the input
-    // dependency if already satisfied.
-    hsa_status_t Status;
-    if (InputSignal && InputSignal->load()) {
-      hsa_signal_t InputSignalRaw = InputSignal->get();
-      Status = hsa_amd_memory_async_copy(Dst, Agent, Inter, Agent, CopySize, 1,
-                                         &InputSignalRaw, OutputSignal->get());
-    } else
-      Status = hsa_amd_memory_async_copy(Dst, Agent, Inter, Agent, CopySize, 0,
-                                         nullptr, OutputSignal->get());
-
-    return Plugin::check(Status, "Error in hsa_amd_memory_async_copy: %s");
-  }
+                               AMDGPUMemoryManagerTy &MemoryManager);
 
   /// Synchronize with the stream. The current thread waits until all operations
   /// are finalized and it performs the pending post actions (i.e., releasing
@@ -3179,6 +3023,174 @@ void *AMDGPUDeviceTy::allocate(size_t Size, void *, TargetAllocTy Kind) {
   return Alloc;
 }
 
+Error AMDGPUStreamTy::pushPinnedMemoryCopyAsync(void *Dst, const void *Src,
+                                                uint64_t CopySize) {
+  // Retrieve an available signal for the operation's output.
+  AMDGPUSignalTy *OutputSignal = nullptr;
+  if (auto Err = SignalManager.getResource(OutputSignal))
+    return Err;
+  OutputSignal->reset();
+  OutputSignal->increaseUseCount();
+
+  std::lock_guard<std::mutex> Lock(Mutex);
+
+  // Consume stream slot and compute dependencies.
+  auto [Curr, InputSignal] = consume(OutputSignal);
+
+  // Avoid defining the input dependency if already satisfied.
+  if (InputSignal && !InputSignal->load())
+    InputSignal = nullptr;
+
+  // Issue the async memory copy.
+  hsa_status_t Status;
+  if (InputSignal) {
+    hsa_signal_t InputSignalRaw = InputSignal->get();
+    Status = hsa_amd_memory_async_copy(Dst, Agent, Src, Agent, CopySize, 1,
+                                       &InputSignalRaw, OutputSignal->get());
+  } else
+    Status = hsa_amd_memory_async_copy(Dst, Agent, Src, Agent, CopySize, 0,
+                                       nullptr, OutputSignal->get());
+
+  return Plugin::check(Status, "Error in hsa_amd_memory_async_copy: %s");
+}
+
+Error AMDGPUStreamTy::pushMemoryCopyD2HAsync(
+    void *Dst, const void *Src, void *Inter, uint64_t CopySize,
+    AMDGPUMemoryManagerTy &MemoryManager) {
+  // Retrieve available signals for the operation's outputs.
+  AMDGPUSignalTy *OutputSignals[2] = {};
+  if (auto Err = SignalManager.getResources(/*Num=*/2, OutputSignals))
+    return Err;
+  for (auto Signal : OutputSignals) {
+    Signal->reset();
+    Signal->increaseUseCount();
+  }
+
+  std::lock_guard<std::mutex> Lock(Mutex);
+
+  // Consume stream slot and compute dependencies.
+  auto [Curr, InputSignal] = consume(OutputSignals[0]);
+
+  // Avoid defining the input dependency if already satisfied.
+  if (InputSignal && !InputSignal->load())
+    InputSignal = nullptr;
+
+  // Setup the post action for releasing the intermediate buffer.
+  if (auto Err = Slots[Curr].schedReleaseBuffer(Inter, MemoryManager))
+    return Err;
+
+  // Issue the first step: device to host transfer. Avoid defining the input
+  // dependency if already satisfied.
+  hsa_status_t Status;
+  if (InputSignal) {
+    hsa_signal_t InputSignalRaw = InputSignal->get();
+    Status =
+        hsa_amd_memory_async_copy(Inter, Agent, Src, Agent, CopySize, 1,
+                                  &InputSignalRaw, OutputSignals[0]->get());
+  } else {
+    Status = hsa_amd_memory_async_copy(Inter, Agent, Src, Agent, CopySize, 0,
+                                       nullptr, OutputSignals[0]->get());
+  }
+
+  if (auto Err =
+          Plugin::check(Status, "Error in hsa_amd_memory_async_copy: %s"))
+    return Err;
+
+  // Consume another stream slot and compute dependencies.
+  std::tie(Curr, InputSignal) = consume(OutputSignals[1]);
+  assert(InputSignal && "Invalid input signal");
+
+  // The std::memcpy is done asynchronously using an async handler. We store
+  // the function's information in the action but it's not actually an action.
+  if (auto Err = Slots[Curr].schedHostMemoryCopy(Dst, Inter, CopySize))
+    return Err;
+
+  // Make changes on this slot visible to the async handler's thread.
+  std::atomic_thread_fence(std::memory_order_release);
+
+  // Issue the second step: host to host transfer.
+  Status = hsa_amd_signal_async_handler(
+      InputSignal->get(), HSA_SIGNAL_CONDITION_EQ, 0, asyncActionCallback,
+      (void *)&Slots[Curr]);
+
+  return Plugin::check(Status, "Error in hsa_amd_signal_async_handler: %s");
+}
+
+Error AMDGPUStreamTy::pushMemoryCopyH2DAsync(
+    void *Dst, const void *Src, void *Inter, uint64_t CopySize,
+    AMDGPUMemoryManagerTy &MemoryManager) {
+  // Retrieve available signals for the operation's outputs.
+  AMDGPUSignalTy *OutputSignals[2] = {};
+  if (auto Err = SignalManager.getResources(/*Num=*/2, OutputSignals))
+    return Err;
+  for (auto Signal : OutputSignals) {
+    Signal->reset();
+    Signal->increaseUseCount();
+  }
+
+  AMDGPUSignalTy *OutputSignal = OutputSignals[0];
+
+  std::lock_guard<std::mutex> Lock(Mutex);
+
+  // Consume stream slot and compute dependencies.
+  auto [Curr, InputSignal] = consume(OutputSignal);
+
+  // Avoid defining the input dependency if already satisfied.
+  if (InputSignal && !InputSignal->load())
+    InputSignal = nullptr;
+
+  // Issue the first step: host to host transfer.
+  if (InputSignal) {
+    // The std::memcpy is done asynchronously using an async handler. We store
+    // the function's information in the action but it is not actually a
+    // post action.
+    if (auto Err = Slots[Curr].schedHostMemoryCopy(Inter, Src, CopySize))
+      return Err;
+
+    // Make changes on this slot visible to the async handler's thread.
+    std::atomic_thread_fence(std::memory_order_release);
+
+    hsa_status_t Status = hsa_amd_signal_async_handler(
+        InputSignal->get(), HSA_SIGNAL_CONDITION_EQ, 0, asyncActionCallback,
+        (void *)&Slots[Curr]);
+
+    if (auto Err =
+            Plugin::check(Status, "Error in hsa_amd_signal_async_handler: %s"))
+      return Err;
+
+    // Let's use now the second output signal.
+    OutputSignal = OutputSignals[1];
+
+    // Consume another stream slot and compute dependencies.
+    std::tie(Curr, InputSignal) = consume(OutputSignal);
+  } else {
+    // All preceding operations completed, copy the memory synchronously.
+    std::memcpy(Inter, Src, CopySize);
+
+    // Return the second signal because it will not be used.
+    OutputSignals[1]->decreaseUseCount();
+    if (auto Err = SignalManager.returnResource(OutputSignals[1]))
+      return Err;
+  }
+
+  // Setup the post action to release the intermediate pinned buffer.
+  if (auto Err = Slots[Curr].schedReleaseBuffer(Inter, MemoryManager))
+    return Err;
+
+  // Issue the second step: host to device transfer. Avoid defining the input
+  // dependency if already satisfied.
+  hsa_status_t Status;
+  if (InputSignal && InputSignal->load()) {
+    hsa_signal_t InputSignalRaw = InputSignal->get();
+    Status = hsa_amd_memory_async_copy(Dst, Agent, Inter, Agent, CopySize, 1,
+                                       &InputSignalRaw, OutputSignal->get());
+  } else
+    Status = hsa_amd_memory_async_copy(Dst, Agent, Inter, Agent, CopySize, 0,
+                                       nullptr, OutputSignal->get());
+
+  return Plugin::check(Status, "Error in hsa_amd_memory_async_copy: %s");
+}
+
 } // namespace plugin
 } // namespace target
 } // namespace omp



More information about the Openmp-commits mailing list