[Openmp-commits] [openmp] a697a0a - [OpenMP][Plugin] Introduce generic resource pool

Shilei Tian via Openmp-commits openmp-commits at lists.llvm.org
Mon Dec 27 08:32:18 PST 2021


Author: Shilei Tian
Date: 2021-12-27T11:32:14-05:00
New Revision: a697a0a4b669a2b99cc973fe5f5164df309d285c

URL: https://github.com/llvm/llvm-project/commit/a697a0a4b669a2b99cc973fe5f5164df309d285c
DIFF: https://github.com/llvm/llvm-project/commit/a697a0a4b669a2b99cc973fe5f5164df309d285c.diff

LOG: [OpenMP][Plugin] Introduce generic resource pool

Currently CUDA streams are managed by `StreamManagerTy`. It works very well. Now
we have the need that some resources, such as CUDA stream and event, will be
hold by `libomptarget`. It is always good to buffer those resources. What's more
important, given the way that `libomptarget` and plugins are connected, we cannot
make sure whether plugins are still alive when `libomptarget` is destroyed. That
leads to an issue that those resouces hold by `libomptarget` might not be
released correctly. As a result, we need an unified management of all the resources
that can be shared between `libomptarget` and plugins.

`ResourcePoolTy` is designed to manage the type of resource for one device.
It has to work with an allocator which is supposed to provide `create` and
`destroy`. In this way, when the plugin is destroyed, we can make sure that
all resources allocated from native runtime library will be released correctly,
no matter whether `libomptarget` starts its destroy.

Reviewed By: ye-luo

Differential Revision: https://reviews.llvm.org/D111954

Added: 
    

Modified: 
    openmp/libomptarget/plugins/cuda/src/rtl.cpp

Removed: 
    


################################################################################
diff  --git a/openmp/libomptarget/plugins/cuda/src/rtl.cpp b/openmp/libomptarget/plugins/cuda/src/rtl.cpp
index 90b6281e3e137..ed26f2f7731f6 100644
--- a/openmp/libomptarget/plugins/cuda/src/rtl.cpp
+++ b/openmp/libomptarget/plugins/cuda/src/rtl.cpp
@@ -187,137 +187,125 @@ struct DeviceDataTy {
   int NumThreads = 0;
 };
 
-class StreamManagerTy {
-  int NumberOfDevices;
-  // The initial size of stream pool
-  int EnvNumInitialStreams;
-  // Per-device stream mutex
-  std::vector<std::unique_ptr<std::mutex>> StreamMtx;
-  // Per-device stream Id indicates the next available stream in the pool
-  std::vector<int> NextStreamId;
-  // Per-device stream pool
-  std::vector<std::vector<CUstream>> StreamPool;
-  // Reference to per-device data
-  std::vector<DeviceDataTy> &DeviceData;
-
-  // If there is no CUstream left in the pool, we will resize the pool to
-  // allocate more CUstream. This function should be called with device mutex,
-  // and we do not resize to smaller one.
-  void resizeStreamPool(const int DeviceId, const size_t NewSize) {
-    std::vector<CUstream> &Pool = StreamPool[DeviceId];
-    const size_t CurrentSize = Pool.size();
-    assert(NewSize > CurrentSize && "new size is not larger than current size");
-
-    CUresult Err = cuCtxSetCurrent(DeviceData[DeviceId].Context);
-    if (!checkResult(Err, "Error returned from cuCtxSetCurrent\n")) {
-      // We will return if cannot switch to the right context in case of
-      // creating bunch of streams that are not corresponding to the right
-      // device. The offloading will fail later because selected CUstream is
-      // nullptr.
-      return;
-    }
-
-    Pool.resize(NewSize, nullptr);
+/// Resource allocator where \p T is the resource type.
+/// Functions \p create and \p destroy return OFFLOAD_SUCCESS and OFFLOAD_FAIL
+/// accordingly. The implementation should not raise any exception.
+template <typename T> class AllocatorTy {
+public:
+  /// Create a resource and assign to R.
+  int create(T &R) noexcept;
+  /// Destroy the resource.
+  int destroy(T) noexcept;
+};
 
-    for (size_t I = CurrentSize; I < NewSize; ++I) {
-      checkResult(cuStreamCreate(&Pool[I], CU_STREAM_NON_BLOCKING),
-                  "Error returned from cuStreamCreate\n");
-    }
-  }
+/// Allocator for CUstream.
+template <> class AllocatorTy<CUstream> {
+  CUcontext Context;
 
 public:
-  StreamManagerTy(const int NumberOfDevices,
-                  std::vector<DeviceDataTy> &DeviceData)
-      : NumberOfDevices(NumberOfDevices), EnvNumInitialStreams(32),
-        DeviceData(DeviceData) {
-    StreamPool.resize(NumberOfDevices);
-    NextStreamId.resize(NumberOfDevices);
-    StreamMtx.resize(NumberOfDevices);
+  AllocatorTy(CUcontext C) noexcept : Context(C) {}
 
-    if (const char *EnvStr = getenv("LIBOMPTARGET_NUM_INITIAL_STREAMS"))
-      EnvNumInitialStreams = std::stoi(EnvStr);
+  /// See AllocatorTy<T>::create.
+  int create(CUstream &Stream) noexcept {
+    if (!checkResult(cuCtxSetCurrent(Context),
+                     "Error returned from cuCtxSetCurrent\n"))
+      return OFFLOAD_FAIL;
 
-    // Initialize the next stream id
-    std::fill(NextStreamId.begin(), NextStreamId.end(), 0);
+    if (!checkResult(cuStreamCreate(&Stream, CU_STREAM_NON_BLOCKING),
+                     "Error returned from cuStreamCreate\n"))
+      return OFFLOAD_FAIL;
 
-    // Initialize stream mutex
-    for (std::unique_ptr<std::mutex> &Ptr : StreamMtx)
-      Ptr = std::make_unique<std::mutex>();
+    return OFFLOAD_SUCCESS;
   }
 
-  ~StreamManagerTy() {
-    // Destroy streams
-    for (int I = 0; I < NumberOfDevices; ++I) {
-      checkResult(cuCtxSetCurrent(DeviceData[I].Context),
-                  "Error returned from cuCtxSetCurrent\n");
+  /// See AllocatorTy<T>::destroy.
+  int destroy(CUstream Stream) noexcept {
+    if (!checkResult(cuCtxSetCurrent(Context),
+                     "Error returned from cuCtxSetCurrent\n"))
+      return OFFLOAD_FAIL;
+    if (!checkResult(cuStreamDestroy(Stream),
+                     "Error returned from cuStreamDestroy\n"))
+      return OFFLOAD_FAIL;
 
-      for (CUstream &S : StreamPool[I]) {
-        if (S)
-          checkResult(cuStreamDestroy(S),
-                      "Error returned from cuStreamDestroy\n");
-      }
-    }
+    return OFFLOAD_SUCCESS;
   }
+};
 
-  // Get a CUstream from pool. Per-device next stream id always points to the
-  // next available CUstream. That means, CUstreams [0, id-1] have been
-  // assigned, and [id,] are still available. If there is no CUstream left, we
-  // will ask more CUstreams from CUDA RT. Each time a CUstream is assigned,
-  // the id will increase one.
-  // xxxxxs+++++++++
-  //      ^
-  //      id
-  // After assignment, the pool becomes the following and s is assigned.
-  // xxxxxs+++++++++
-  //       ^
-  //       id
-  CUstream getStream(const int DeviceId) {
-    const std::lock_guard<std::mutex> Lock(*StreamMtx[DeviceId]);
-    int &Id = NextStreamId[DeviceId];
-    // No CUstream left in the pool, we need to request from CUDA RT
-    if (Id == static_cast<int>(StreamPool[DeviceId].size())) {
-      // By default we double the stream pool every time
-      resizeStreamPool(DeviceId, Id * 2);
+/// A generic pool of resources where \p T is the resource type.
+/// \p T should be copyable as the object is stored in \p std::vector .
+template <typename T> class ResourcePoolTy {
+  /// Index of the next available resource.
+  size_t Next = 0;
+  /// Mutex to guard the pool.
+  std::mutex Mutex;
+  /// Pool of resources.
+  std::vector<T> Resources;
+  /// A reference to the corresponding allocator.
+  AllocatorTy<T> Allocator;
+
+  /// If `Resources` is used up, we will fill in more resources. It assumes that
+  /// the new size `Size` should be always larger than the current size.
+  bool resize(size_t Size) {
+    auto CurSize = Resources.size();
+    assert(Size > CurSize && "Unexpected smaller size");
+    Resources.reserve(Size);
+    for (auto I = CurSize; I < Size; ++I) {
+      T NewItem;
+      int Ret = Allocator.create(NewItem);
+      if (Ret != OFFLOAD_SUCCESS)
+        return false;
+      Resources.push_back(NewItem);
     }
-    return StreamPool[DeviceId][Id++];
+    return true;
   }
 
-  // Return a CUstream back to pool. As mentioned above, per-device next
-  // stream is always points to the next available CUstream, so when we return
-  // a CUstream, we need to first decrease the id, and then copy the CUstream
-  // back.
-  // It is worth noting that, the order of streams return might be 
diff erent
-  // from that they're assigned, that saying, at some point, there might be
-  // two identical CUstreams.
-  // xxax+a+++++
-  //     ^
-  //     id
-  // However, it doesn't matter, because they're always on the two sides of
-  // id. The left one will in the end be overwritten by another CUstream.
-  // Therefore, after several execution, the order of pool might be 
diff erent
-  // from its initial state.
-  void returnStream(const int DeviceId, CUstream Stream) {
-    const std::lock_guard<std::mutex> Lock(*StreamMtx[DeviceId]);
-    int &Id = NextStreamId[DeviceId];
-    assert(Id > 0 && "Wrong stream ID");
-    StreamPool[DeviceId][--Id] = Stream;
+public:
+  ResourcePoolTy(AllocatorTy<T> &&A, size_t Size = 0) noexcept
+      : Allocator(std::move(A)) {
+    (void)resize(Size);
   }
 
-  bool initializeDeviceStreamPool(const int DeviceId) {
-    assert(StreamPool[DeviceId].empty() && "stream pool has been initialized");
+  ~ResourcePoolTy() noexcept {
+    for (auto &R : Resources)
+      (void)Allocator.destroy(R);
+  }
 
-    resizeStreamPool(DeviceId, EnvNumInitialStreams);
+  /// Get a resource from pool. `Next` always points to the next available
+  /// resource. That means, `[0, next-1]` have been assigned, and `[id,]` are
+  /// still available. If there is no resource left, we will ask for more. Each
+  /// time a resource is assigned, the id will increase one.
+  /// xxxxxs+++++++++
+  ///      ^
+  ///      Next
+  /// After assignment, the pool becomes the following and s is assigned.
+  /// xxxxxs+++++++++
+  ///       ^
+  ///       Next
+  int acquire(T &R) noexcept {
+    std::lock_guard<std::mutex> LG(Mutex);
+    if (Next == Resources.size() && !resize(Resources.size() * 2))
+      return OFFLOAD_FAIL;
 
-    // Check the size of stream pool
-    if (static_cast<int>(StreamPool[DeviceId].size()) != EnvNumInitialStreams)
-      return false;
+    R = Resources[Next++];
 
-    // Check whether each stream is valid
-    for (CUstream &S : StreamPool[DeviceId])
-      if (!S)
-        return false;
+    return OFFLOAD_SUCCESS;
+  }
 
-    return true;
+  /// Return the resource back to the pool. When we return a resource, we need
+  /// to first decrease `Next`, and then copy the resource back. It is worth
+  /// noting that, the order of resources return might be 
diff erent from that
+  /// they're assigned, that saying, at some point, there might be two identical
+  /// resources.
+  /// xxax+a+++++
+  ///     ^
+  ///     Next
+  /// However, it doesn't matter, because they're always on the two sides of
+  /// `Next`. The left one will in the end be overwritten by another resource.
+  /// Therefore, after several execution, the order of pool might be 
diff erent
+  /// from its initial state.
+  void release(T R) noexcept {
+    std::lock_guard<std::mutex> LG(Mutex);
+    Resources[--Next] = R;
   }
 };
 
@@ -331,13 +319,18 @@ class DeviceRTLTy {
   int64_t RequiresFlags;
   // Amount of dynamic shared memory to use at launch.
   uint64_t DynamicMemorySize;
+  // Number of initial streams for each device.
+  int NumInitialStreams = 32;
 
   static constexpr const int HardTeamLimit = 1U << 16U; // 64k
   static constexpr const int HardThreadLimit = 1024;
   static constexpr const int DefaultNumTeams = 128;
   static constexpr const int DefaultNumThreads = 128;
 
-  std::unique_ptr<StreamManagerTy> StreamManager;
+  using StreamPoolTy = ResourcePoolTy<CUstream>;
+  using StreamAllocatorTy = AllocatorTy<CUstream>;
+  std::vector<std::unique_ptr<StreamPoolTy>> StreamPool;
+
   std::vector<DeviceDataTy> DeviceData;
   std::vector<CUmodule> Modules;
 
@@ -471,8 +464,13 @@ class DeviceRTLTy {
   CUstream getStream(const int DeviceId, __tgt_async_info *AsyncInfo) const {
     assert(AsyncInfo && "AsyncInfo is nullptr");
 
-    if (!AsyncInfo->Queue)
-      AsyncInfo->Queue = StreamManager->getStream(DeviceId);
+    if (!AsyncInfo->Queue) {
+      CUstream S;
+      if (StreamPool[DeviceId]->acquire(S) != OFFLOAD_SUCCESS)
+        return nullptr;
+
+      AsyncInfo->Queue = S;
+    }
 
     return reinterpret_cast<CUstream>(AsyncInfo->Queue);
   }
@@ -509,6 +507,7 @@ class DeviceRTLTy {
     }
 
     DeviceData.resize(NumberOfDevices);
+    StreamPool.resize(NumberOfDevices);
 
     // Get environment variables regarding teams
     if (const char *EnvStr = getenv("OMP_TEAM_LIMIT")) {
@@ -532,9 +531,11 @@ class DeviceRTLTy {
       DP("Parsed LIBOMPTARGET_SHARED_MEMORY_SIZE = %" PRIu64 "\n",
          DynamicMemorySize);
     }
-
-    StreamManager =
-        std::make_unique<StreamManagerTy>(NumberOfDevices, DeviceData);
+    if (const char *EnvStr = getenv("LIBOMPTARGET_NUM_INITIAL_STREAMS")) {
+      // LIBOMPTARGET_NUM_INITIAL_STREAMS has been set
+      NumInitialStreams = std::stoi(EnvStr);
+      DP("Parsed LIBOMPTARGET_NUM_INITIAL_STREAMS=%d\n", NumInitialStreams);
+    }
 
     for (int I = 0; I < NumberOfDevices; ++I)
       DeviceAllocators.emplace_back(I, DeviceData);
@@ -556,13 +557,14 @@ class DeviceRTLTy {
     for (auto &M : MemoryManagers)
       M.release();
 
-    StreamManager = nullptr;
-
     for (CUmodule &M : Modules)
       // Close module
       if (M)
         checkResult(cuModuleUnload(M), "Error returned from cuModuleUnload\n");
 
+    for (auto &S : StreamPool)
+      S = nullptr;
+
     for (DeviceDataTy &D : DeviceData) {
       // Destroy context
       if (D.Context) {
@@ -627,8 +629,9 @@ class DeviceRTLTy {
       return OFFLOAD_FAIL;
 
     // Initialize stream pool
-    if (!StreamManager->initializeDeviceStreamPool(DeviceId))
-      return OFFLOAD_FAIL;
+    if (!StreamPool[DeviceId])
+      StreamPool[DeviceId] = std::make_unique<StreamPoolTy>(
+          StreamAllocatorTy(DeviceData[DeviceId].Context), NumInitialStreams);
 
     // Query attributes to determine number of threads/block and blocks/grid.
     int MaxGridDimX;
@@ -1195,8 +1198,7 @@ class DeviceRTLTy {
     // Once the stream is synchronized, return it to stream pool and reset
     // AsyncInfo. This is to make sure the synchronization only works for its
     // own tasks.
-    StreamManager->returnStream(DeviceId,
-                                reinterpret_cast<CUstream>(AsyncInfo->Queue));
+    StreamPool[DeviceId]->release(reinterpret_cast<CUstream>(AsyncInfo->Queue));
     AsyncInfo->Queue = nullptr;
 
     if (Err != CUDA_SUCCESS) {


        


More information about the Openmp-commits mailing list