[Openmp-commits] [openmp] 5b19f42 - [OpenMP][AMDGPU] Single eager resource init + HSA queue utilization tracking

Michael Halkenhaeuser via Openmp-commits openmp-commits at lists.llvm.org
Wed Aug 2 05:24:08 PDT 2023


Author: Michael Halkenhaeuser
Date: 2023-08-02T08:22:26-04:00
New Revision: 5b19f42b631dddf2bc69b667e560fd903b9ef689

URL: https://github.com/llvm/llvm-project/commit/5b19f42b631dddf2bc69b667e560fd903b9ef689
DIFF: https://github.com/llvm/llvm-project/commit/5b19f42b631dddf2bc69b667e560fd903b9ef689.diff

LOG: [OpenMP][AMDGPU] Single eager resource init + HSA queue utilization tracking

This patch lazily initializes queues/streams/events since their initialization
might come at a cost even if we do not use them.

To further benefit from this, AMDGPU/HSA queue management is moved into the
AMDGPUStreamManager of an AMDGPUDevice. Streams may now use different HSA queues
during their lifetime and identify busy queues.

When a Stream is requested from the resource manager, it will search for and
try to assign an idle queue. During the search for an idle queue the manager
may initialize more queues, up to the set maximum (default: 4).
When no idle queue could be found: resort to round robin selection.

With contributions from Johannes Doerfert <johannes at jdoerfert.de>

Depends on D156245

Reviewed By: kevinsala

Differential Revision: https://reviews.llvm.org/D154523

Added: 
    

Modified: 
    openmp/docs/design/Runtimes.rst
    openmp/libomptarget/include/Utilities.h
    openmp/libomptarget/plugins-nextgen/amdgpu/src/rtl.cpp
    openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.cpp
    openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.h

Removed: 
    


################################################################################
diff  --git a/openmp/docs/design/Runtimes.rst b/openmp/docs/design/Runtimes.rst
index 263050e020cb77..84a65b183bf046 100644
--- a/openmp/docs/design/Runtimes.rst
+++ b/openmp/docs/design/Runtimes.rst
@@ -1193,7 +1193,7 @@ throughout the execution if needed. A stream is a queue of asynchronous
 operations (e.g., kernel launches and memory copies) that are executed
 sequentially. Parallelism is achieved by featuring multiple streams. The
 ``libomptarget`` leverages streams to exploit parallelism between plugin
-operations. The default value is ``32``.
+operations. The default value is ``1``, more streams are created as needed.
 
 LIBOMPTARGET_NUM_INITIAL_EVENTS
 """""""""""""""""""""""""""""""
@@ -1201,7 +1201,8 @@ LIBOMPTARGET_NUM_INITIAL_EVENTS
 This environment variable sets the number of pre-created events in the
 plugin (if supported) at initialization. More events will be created
 dynamically throughout the execution if needed. An event is used to synchronize
-a stream with another efficiently. The default value is ``32``.
+a stream with another efficiently. The default value is ``1``, more events are
+created as needed.
 
 LIBOMPTARGET_LOCK_MAPPED_HOST_BUFFERS
 """""""""""""""""""""""""""""""""""""

diff  --git a/openmp/libomptarget/include/Utilities.h b/openmp/libomptarget/include/Utilities.h
index b769d063789322..7f2884ed7ea06f 100644
--- a/openmp/libomptarget/include/Utilities.h
+++ b/openmp/libomptarget/include/Utilities.h
@@ -83,6 +83,12 @@ template <typename Ty> class Envar {
     }
   }
 
+  Envar<Ty> &operator=(const Ty &V) {
+    Data = V;
+    Initialized = true;
+    return *this;
+  }
+
   /// Get the definitive value.
   const Ty &get() const {
     // Throw a runtime error in case this envar is not initialized.

diff  --git a/openmp/libomptarget/plugins-nextgen/amdgpu/src/rtl.cpp b/openmp/libomptarget/plugins-nextgen/amdgpu/src/rtl.cpp
index 403083f81a42a9..85c17df84cd2e2 100644
--- a/openmp/libomptarget/plugins-nextgen/amdgpu/src/rtl.cpp
+++ b/openmp/libomptarget/plugins-nextgen/amdgpu/src/rtl.cpp
@@ -583,10 +583,12 @@ using AMDGPUSignalManagerTy = GenericDeviceResourceManagerTy<AMDGPUSignalRef>;
 /// Class holding an HSA queue to submit kernel and barrier packets.
 struct AMDGPUQueueTy {
   /// Create an empty queue.
-  AMDGPUQueueTy() : Queue(nullptr), Mutex() {}
+  AMDGPUQueueTy() : Queue(nullptr), Mutex(), NumUsers(0) {}
 
-  /// Initialize a new queue belonging to a specific agent.
+  /// Lazily initialize a new queue belonging to a specific agent.
   Error init(hsa_agent_t Agent, int32_t QueueSize) {
+    if (Queue)
+      return Plugin::success();
     hsa_status_t Status =
         hsa_queue_create(Agent, QueueSize, HSA_QUEUE_TYPE_MULTI, callbackError,
                          nullptr, UINT32_MAX, UINT32_MAX, &Queue);
@@ -595,10 +597,22 @@ struct AMDGPUQueueTy {
 
   /// Deinitialize the queue and destroy its resources.
   Error deinit() {
+    std::lock_guard<std::mutex> Lock(Mutex);
+    if (!Queue)
+      return Plugin::success();
     hsa_status_t Status = hsa_queue_destroy(Queue);
     return Plugin::check(Status, "Error in hsa_queue_destroy: %s");
   }
 
+  /// Returns if this queue is considered busy
+  bool isBusy() const { return NumUsers > 0; }
+
+  /// Decrement user count of the queue object
+  void removeUser() { --NumUsers; }
+
+  /// Increase user count of the queue object
+  void addUser() { ++NumUsers; }
+
   /// Push a kernel launch to the queue. The kernel launch requires an output
   /// signal and can define an optional input signal (nullptr if none).
   Error pushKernelLaunch(const AMDGPUKernelTy &Kernel, void *KernelArgs,
@@ -611,6 +625,7 @@ struct AMDGPUQueueTy {
     // the addition of other packets to the queue. The following piece of code
     // should be lightweight; do not block the thread, allocate memory, etc.
     std::lock_guard<std::mutex> Lock(Mutex);
+    assert(Queue && "Interacted with a non-initialized queue!");
 
     // Avoid defining the input dependency if already satisfied.
     if (InputSignal && !InputSignal->load())
@@ -659,6 +674,7 @@ struct AMDGPUQueueTy {
                     const AMDGPUSignalTy *InputSignal2) {
     // Lock the queue during the packet publishing process.
     std::lock_guard<std::mutex> Lock(Mutex);
+    assert(Queue && "Interacted with a non-initialized queue!");
 
     // Push the barrier with the lock acquired.
     return pushBarrierImpl(OutputSignal, InputSignal1, InputSignal2);
@@ -777,6 +793,9 @@ struct AMDGPUQueueTy {
   /// TODO: There are other more advanced approaches to avoid this mutex using
   /// atomic operations. We can further investigate it if this is a bottleneck.
   std::mutex Mutex;
+
+  /// Indicates that the queue is busy when > 0
+  int NumUsers;
 };
 
 /// Struct that implements a stream of asynchronous operations for AMDGPU
@@ -886,7 +905,7 @@ struct AMDGPUStreamTy {
   hsa_agent_t Agent;
 
   /// The queue that the stream uses to launch kernels.
-  AMDGPUQueueTy &Queue;
+  AMDGPUQueueTy *Queue;
 
   /// The manager of signals to reuse signals.
   AMDGPUSignalManagerTy &SignalManager;
@@ -978,6 +997,9 @@ struct AMDGPUStreamTy {
   /// signal of the current stream, and 2) the last signal of the other stream.
   /// Use a barrier packet with two input signals.
   Error waitOnStreamOperation(AMDGPUStreamTy &OtherStream, uint32_t Slot) {
+    if (Queue == nullptr)
+      return Plugin::error("Target queue was nullptr");
+
     /// The signal that we must wait from the other stream.
     AMDGPUSignalTy *OtherSignal = OtherStream.Slots[Slot].Signal;
 
@@ -999,7 +1021,7 @@ struct AMDGPUStreamTy {
       return Err;
 
     // Push a barrier into the queue with both input signals.
-    return Queue.pushBarrier(OutputSignal, InputSignal, OtherSignal);
+    return Queue->pushBarrier(OutputSignal, InputSignal, OtherSignal);
   }
 
   /// Callback for running a specific asynchronous operation. This callback is
@@ -1085,6 +1107,9 @@ struct AMDGPUStreamTy {
                          uint32_t NumThreads, uint64_t NumBlocks,
                          uint32_t GroupSize,
                          AMDGPUMemoryManagerTy &MemoryManager) {
+    if (Queue == nullptr)
+      return Plugin::error("Target queue was nullptr");
+
     // Retrieve an available signal for the operation's output.
     AMDGPUSignalTy *OutputSignal = nullptr;
     if (auto Err = SignalManager.getResource(OutputSignal))
@@ -1102,8 +1127,8 @@ struct AMDGPUStreamTy {
       return Err;
 
     // Push the kernel with the output signal and an input signal (optional)
-    return Queue.pushKernelLaunch(Kernel, KernelArgs, NumThreads, NumBlocks,
-                                  GroupSize, OutputSignal, InputSignal);
+    return Queue->pushKernelLaunch(Kernel, KernelArgs, NumThreads, NumBlocks,
+                                   GroupSize, OutputSignal, InputSignal);
   }
 
   /// Push an asynchronous memory copy between pinned memory buffers.
@@ -1331,6 +1356,8 @@ struct AMDGPUStreamTy {
 
   /// Make the stream wait on an event.
   Error waitEvent(const AMDGPUEventTy &Event);
+
+  friend struct AMDGPUStreamManagerTy;
 };
 
 /// Class representing an event on AMDGPU. The event basically stores some
@@ -1428,6 +1455,99 @@ Error AMDGPUStreamTy::waitEvent(const AMDGPUEventTy &Event) {
   return waitOnStreamOperation(RecordedStream, Event.RecordedSlot);
 }
 
+struct AMDGPUStreamManagerTy final
+    : GenericDeviceResourceManagerTy<AMDGPUResourceRef<AMDGPUStreamTy>> {
+  using ResourceRef = AMDGPUResourceRef<AMDGPUStreamTy>;
+  using ResourcePoolTy = GenericDeviceResourceManagerTy<ResourceRef>;
+
+  AMDGPUStreamManagerTy(GenericDeviceTy &Device, hsa_agent_t HSAAgent)
+      : GenericDeviceResourceManagerTy(Device), NextQueue(0), Agent(HSAAgent) {}
+
+  Error init(uint32_t InitialSize, int NumHSAQueues, int HSAQueueSize) {
+    Queues = std::vector<AMDGPUQueueTy>(NumHSAQueues);
+    QueueSize = HSAQueueSize;
+    MaxNumQueues = NumHSAQueues;
+    // Initialize one queue eagerly
+    if (auto Err = Queues.front().init(Agent, QueueSize))
+      return Err;
+
+    return GenericDeviceResourceManagerTy::init(InitialSize);
+  }
+
+  /// Deinitialize the resource pool and delete all resources. This function
+  /// must be called before the destructor.
+  Error deinit() override {
+    // De-init all queues
+    for (AMDGPUQueueTy &Queue : Queues) {
+      if (auto Err = Queue.deinit())
+        return Err;
+    }
+
+    return GenericDeviceResourceManagerTy::deinit();
+  }
+
+  /// Get a single stream from the pool or create new resources.
+  virtual Error getResource(AMDGPUStreamTy *&StreamHandle) override {
+    return getResourcesImpl(1, &StreamHandle, [this](AMDGPUStreamTy *&Handle) {
+      return assignNextQueue(Handle);
+    });
+  }
+
+  /// Return stream to the pool.
+  virtual Error returnResource(AMDGPUStreamTy *StreamHandle) override {
+    return returnResourceImpl(StreamHandle, [](AMDGPUStreamTy *Handle) {
+      Handle->Queue->removeUser();
+      return Plugin::success();
+    });
+  }
+
+private:
+  /// Search for and assign an prefereably idle queue to the given Stream. If
+  /// there is no queue without current users, resort to round robin selection.
+  inline Error assignNextQueue(AMDGPUStreamTy *Stream) {
+    uint32_t StartIndex = NextQueue % MaxNumQueues;
+    AMDGPUQueueTy *Q = nullptr;
+
+    for (int i = 0; i < MaxNumQueues; ++i) {
+      Q = &Queues[StartIndex++];
+      if (StartIndex == MaxNumQueues)
+        StartIndex = 0;
+
+      if (Q->isBusy())
+        continue;
+      else {
+        if (auto Err = Q->init(Agent, QueueSize))
+          return Err;
+
+        Q->addUser();
+        Stream->Queue = Q;
+        return Plugin::success();
+      }
+    }
+
+    // All queues busy: Round robin (StartIndex has the initial value again)
+    Queues[StartIndex].addUser();
+    Stream->Queue = &Queues[StartIndex];
+    ++NextQueue;
+    return Plugin::success();
+  }
+
+  /// The next queue index to use for round robin selection.
+  uint32_t NextQueue;
+
+  /// The queues which are assigned to requested streams.
+  std::vector<AMDGPUQueueTy> Queues;
+
+  /// The corresponding device as HSA agent.
+  hsa_agent_t Agent;
+
+  /// The maximum number of queues.
+  int MaxNumQueues;
+
+  /// The size of created queues.
+  int QueueSize;
+};
+
 /// Abstract class that holds the common members of the actual kernel devices
 /// and the host device. Both types should inherit from this class.
 struct AMDGenericDeviceTy {
@@ -1607,9 +1727,8 @@ struct AMDGPUDeviceTy : public GenericDeviceTy, AMDGenericDeviceTy {
         OMPX_InitialNumSignals("LIBOMPTARGET_AMDGPU_NUM_INITIAL_HSA_SIGNALS",
                                64),
         OMPX_StreamBusyWait("LIBOMPTARGET_AMDGPU_STREAM_BUSYWAIT", 2000000),
-        AMDGPUStreamManager(*this), AMDGPUEventManager(*this),
-        AMDGPUSignalManager(*this), Agent(Agent), HostDevice(HostDevice),
-        Queues() {}
+        AMDGPUStreamManager(*this, Agent), AMDGPUEventManager(*this),
+        AMDGPUSignalManager(*this), Agent(Agent), HostDevice(HostDevice) {}
 
   ~AMDGPUDeviceTy() {}
 
@@ -1676,17 +1795,12 @@ struct AMDGPUDeviceTy : public GenericDeviceTy, AMDGenericDeviceTy {
       return Err;
 
     // Compute the number of queues and their size.
-    const uint32_t NumQueues = std::min(OMPX_NumQueues.get(), MaxQueues);
-    const uint32_t QueueSize = std::min(OMPX_QueueSize.get(), MaxQueueSize);
-
-    // Construct and initialize each device queue.
-    Queues = std::vector<AMDGPUQueueTy>(NumQueues);
-    for (AMDGPUQueueTy &Queue : Queues)
-      if (auto Err = Queue.init(Agent, QueueSize))
-        return Err;
+    OMPX_NumQueues = std::max(1U, std::min(OMPX_NumQueues.get(), MaxQueues));
+    OMPX_QueueSize = std::min(OMPX_QueueSize.get(), MaxQueueSize);
 
     // Initialize stream pool.
-    if (auto Err = AMDGPUStreamManager.init(OMPX_InitialNumStreams))
+    if (auto Err = AMDGPUStreamManager.init(OMPX_InitialNumStreams,
+                                            OMPX_NumQueues, OMPX_QueueSize))
       return Err;
 
     // Initialize event pool.
@@ -1725,11 +1839,6 @@ struct AMDGPUDeviceTy : public GenericDeviceTy, AMDGenericDeviceTy {
       }
     }
 
-    for (AMDGPUQueueTy &Queue : Queues) {
-      if (auto Err = Queue.deinit())
-        return Err;
-    }
-
     // Invalidate agent reference.
     Agent = {0};
 
@@ -2416,19 +2525,8 @@ struct AMDGPUDeviceTy : public GenericDeviceTy, AMDGenericDeviceTy {
         });
   }
 
-  /// Get the next queue in a round-robin fashion.
-  AMDGPUQueueTy &getNextQueue() {
-    static std::atomic<uint32_t> NextQueue(0);
-
-    uint32_t Current = NextQueue.fetch_add(1, std::memory_order_relaxed);
-    return Queues[Current % Queues.size()];
-  }
-
 private:
-  using AMDGPUStreamRef = AMDGPUResourceRef<AMDGPUStreamTy>;
   using AMDGPUEventRef = AMDGPUResourceRef<AMDGPUEventTy>;
-
-  using AMDGPUStreamManagerTy = GenericDeviceResourceManagerTy<AMDGPUStreamRef>;
   using AMDGPUEventManagerTy = GenericDeviceResourceManagerTy<AMDGPUEventRef>;
 
   /// Envar for controlling the number of HSA queues per device. High number of
@@ -2484,9 +2582,6 @@ struct AMDGPUDeviceTy : public GenericDeviceTy, AMDGenericDeviceTy {
 
   /// Reference to the host device.
   AMDHostDeviceTy &HostDevice;
-
-  /// List of device packet queues.
-  std::vector<AMDGPUQueueTy> Queues;
 };
 
 Error AMDGPUDeviceImageTy::loadExecutable(const AMDGPUDeviceTy &Device) {
@@ -2558,7 +2653,7 @@ Error AMDGPUResourceRef<ResourceTy>::create(GenericDeviceTy &Device) {
 }
 
 AMDGPUStreamTy::AMDGPUStreamTy(AMDGPUDeviceTy &Device)
-    : Agent(Device.getAgent()), Queue(Device.getNextQueue()),
+    : Agent(Device.getAgent()), Queue(nullptr),
       SignalManager(Device.getSignalManager()), Device(Device),
       // Initialize the std::deque with some empty positions.
       Slots(32), NextSlot(0), SyncCycle(0), RPCServer(nullptr),

diff  --git a/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.cpp b/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.cpp
index 6499ca54253820..d6008a83f5ceb2 100644
--- a/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.cpp
+++ b/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.cpp
@@ -396,9 +396,9 @@ GenericDeviceTy::GenericDeviceTy(int32_t DeviceId, int32_t NumDevices,
       // device initialization. These cannot be consulted until the device is
       // initialized correctly. We intialize them in GenericDeviceTy::init().
       OMPX_TargetStackSize(), OMPX_TargetHeapSize(),
-      // By default, the initial number of streams and events are 32.
-      OMPX_InitialNumStreams("LIBOMPTARGET_NUM_INITIAL_STREAMS", 32),
-      OMPX_InitialNumEvents("LIBOMPTARGET_NUM_INITIAL_EVENTS", 32),
+      // By default, the initial number of streams and events is 1.
+      OMPX_InitialNumStreams("LIBOMPTARGET_NUM_INITIAL_STREAMS", 1),
+      OMPX_InitialNumEvents("LIBOMPTARGET_NUM_INITIAL_EVENTS", 1),
       DeviceId(DeviceId), GridValues(OMPGridValues),
       PeerAccesses(NumDevices, PeerAccessState::PENDING), PeerAccessesLock(),
       PinnedAllocs(*this), RPCServer(nullptr) {

diff  --git a/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.h b/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.h
index d670fa305ba99e..fb39f5282c2b85 100644
--- a/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.h
+++ b/openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.h
@@ -1168,7 +1168,7 @@ template <typename ResourceRef> class GenericDeviceResourceManagerTy {
 
   /// Deinitialize the resource pool and delete all resources. This function
   /// must be called before the destructor.
-  Error deinit() {
+  virtual Error deinit() {
     if (NextAvailable)
       DP("Missing %d resources to be returned\n", NextAvailable);
 
@@ -1252,7 +1252,7 @@ template <typename ResourceRef> class GenericDeviceResourceManagerTy {
     return Plugin::success();
   }
 
-private:
+protected:
   /// The resources between \p OldSize and \p NewSize need to be created or
   /// destroyed. The mutex is locked when this function is called.
   Error resizeResourcePoolImpl(uint32_t OldSize, uint32_t NewSize) {


        


More information about the Openmp-commits mailing list