[Openmp-commits] [openmp] a697a0a - [OpenMP][Plugin] Introduce generic resource pool
Shilei Tian via Openmp-commits
openmp-commits at lists.llvm.org
Mon Dec 27 08:32:18 PST 2021
Author: Shilei Tian
Date: 2021-12-27T11:32:14-05:00
New Revision: a697a0a4b669a2b99cc973fe5f5164df309d285c
URL: https://github.com/llvm/llvm-project/commit/a697a0a4b669a2b99cc973fe5f5164df309d285c
DIFF: https://github.com/llvm/llvm-project/commit/a697a0a4b669a2b99cc973fe5f5164df309d285c.diff
LOG: [OpenMP][Plugin] Introduce generic resource pool
Currently CUDA streams are managed by `StreamManagerTy`. It works very well. Now
we have the need that some resources, such as CUDA stream and event, will be
hold by `libomptarget`. It is always good to buffer those resources. What's more
important, given the way that `libomptarget` and plugins are connected, we cannot
make sure whether plugins are still alive when `libomptarget` is destroyed. That
leads to an issue that those resouces hold by `libomptarget` might not be
released correctly. As a result, we need an unified management of all the resources
that can be shared between `libomptarget` and plugins.
`ResourcePoolTy` is designed to manage the type of resource for one device.
It has to work with an allocator which is supposed to provide `create` and
`destroy`. In this way, when the plugin is destroyed, we can make sure that
all resources allocated from native runtime library will be released correctly,
no matter whether `libomptarget` starts its destroy.
Reviewed By: ye-luo
Differential Revision: https://reviews.llvm.org/D111954
Added:
Modified:
openmp/libomptarget/plugins/cuda/src/rtl.cpp
Removed:
################################################################################
diff --git a/openmp/libomptarget/plugins/cuda/src/rtl.cpp b/openmp/libomptarget/plugins/cuda/src/rtl.cpp
index 90b6281e3e137..ed26f2f7731f6 100644
--- a/openmp/libomptarget/plugins/cuda/src/rtl.cpp
+++ b/openmp/libomptarget/plugins/cuda/src/rtl.cpp
@@ -187,137 +187,125 @@ struct DeviceDataTy {
int NumThreads = 0;
};
-class StreamManagerTy {
- int NumberOfDevices;
- // The initial size of stream pool
- int EnvNumInitialStreams;
- // Per-device stream mutex
- std::vector<std::unique_ptr<std::mutex>> StreamMtx;
- // Per-device stream Id indicates the next available stream in the pool
- std::vector<int> NextStreamId;
- // Per-device stream pool
- std::vector<std::vector<CUstream>> StreamPool;
- // Reference to per-device data
- std::vector<DeviceDataTy> &DeviceData;
-
- // If there is no CUstream left in the pool, we will resize the pool to
- // allocate more CUstream. This function should be called with device mutex,
- // and we do not resize to smaller one.
- void resizeStreamPool(const int DeviceId, const size_t NewSize) {
- std::vector<CUstream> &Pool = StreamPool[DeviceId];
- const size_t CurrentSize = Pool.size();
- assert(NewSize > CurrentSize && "new size is not larger than current size");
-
- CUresult Err = cuCtxSetCurrent(DeviceData[DeviceId].Context);
- if (!checkResult(Err, "Error returned from cuCtxSetCurrent\n")) {
- // We will return if cannot switch to the right context in case of
- // creating bunch of streams that are not corresponding to the right
- // device. The offloading will fail later because selected CUstream is
- // nullptr.
- return;
- }
-
- Pool.resize(NewSize, nullptr);
+/// Resource allocator where \p T is the resource type.
+/// Functions \p create and \p destroy return OFFLOAD_SUCCESS and OFFLOAD_FAIL
+/// accordingly. The implementation should not raise any exception.
+template <typename T> class AllocatorTy {
+public:
+ /// Create a resource and assign to R.
+ int create(T &R) noexcept;
+ /// Destroy the resource.
+ int destroy(T) noexcept;
+};
- for (size_t I = CurrentSize; I < NewSize; ++I) {
- checkResult(cuStreamCreate(&Pool[I], CU_STREAM_NON_BLOCKING),
- "Error returned from cuStreamCreate\n");
- }
- }
+/// Allocator for CUstream.
+template <> class AllocatorTy<CUstream> {
+ CUcontext Context;
public:
- StreamManagerTy(const int NumberOfDevices,
- std::vector<DeviceDataTy> &DeviceData)
- : NumberOfDevices(NumberOfDevices), EnvNumInitialStreams(32),
- DeviceData(DeviceData) {
- StreamPool.resize(NumberOfDevices);
- NextStreamId.resize(NumberOfDevices);
- StreamMtx.resize(NumberOfDevices);
+ AllocatorTy(CUcontext C) noexcept : Context(C) {}
- if (const char *EnvStr = getenv("LIBOMPTARGET_NUM_INITIAL_STREAMS"))
- EnvNumInitialStreams = std::stoi(EnvStr);
+ /// See AllocatorTy<T>::create.
+ int create(CUstream &Stream) noexcept {
+ if (!checkResult(cuCtxSetCurrent(Context),
+ "Error returned from cuCtxSetCurrent\n"))
+ return OFFLOAD_FAIL;
- // Initialize the next stream id
- std::fill(NextStreamId.begin(), NextStreamId.end(), 0);
+ if (!checkResult(cuStreamCreate(&Stream, CU_STREAM_NON_BLOCKING),
+ "Error returned from cuStreamCreate\n"))
+ return OFFLOAD_FAIL;
- // Initialize stream mutex
- for (std::unique_ptr<std::mutex> &Ptr : StreamMtx)
- Ptr = std::make_unique<std::mutex>();
+ return OFFLOAD_SUCCESS;
}
- ~StreamManagerTy() {
- // Destroy streams
- for (int I = 0; I < NumberOfDevices; ++I) {
- checkResult(cuCtxSetCurrent(DeviceData[I].Context),
- "Error returned from cuCtxSetCurrent\n");
+ /// See AllocatorTy<T>::destroy.
+ int destroy(CUstream Stream) noexcept {
+ if (!checkResult(cuCtxSetCurrent(Context),
+ "Error returned from cuCtxSetCurrent\n"))
+ return OFFLOAD_FAIL;
+ if (!checkResult(cuStreamDestroy(Stream),
+ "Error returned from cuStreamDestroy\n"))
+ return OFFLOAD_FAIL;
- for (CUstream &S : StreamPool[I]) {
- if (S)
- checkResult(cuStreamDestroy(S),
- "Error returned from cuStreamDestroy\n");
- }
- }
+ return OFFLOAD_SUCCESS;
}
+};
- // Get a CUstream from pool. Per-device next stream id always points to the
- // next available CUstream. That means, CUstreams [0, id-1] have been
- // assigned, and [id,] are still available. If there is no CUstream left, we
- // will ask more CUstreams from CUDA RT. Each time a CUstream is assigned,
- // the id will increase one.
- // xxxxxs+++++++++
- // ^
- // id
- // After assignment, the pool becomes the following and s is assigned.
- // xxxxxs+++++++++
- // ^
- // id
- CUstream getStream(const int DeviceId) {
- const std::lock_guard<std::mutex> Lock(*StreamMtx[DeviceId]);
- int &Id = NextStreamId[DeviceId];
- // No CUstream left in the pool, we need to request from CUDA RT
- if (Id == static_cast<int>(StreamPool[DeviceId].size())) {
- // By default we double the stream pool every time
- resizeStreamPool(DeviceId, Id * 2);
+/// A generic pool of resources where \p T is the resource type.
+/// \p T should be copyable as the object is stored in \p std::vector .
+template <typename T> class ResourcePoolTy {
+ /// Index of the next available resource.
+ size_t Next = 0;
+ /// Mutex to guard the pool.
+ std::mutex Mutex;
+ /// Pool of resources.
+ std::vector<T> Resources;
+ /// A reference to the corresponding allocator.
+ AllocatorTy<T> Allocator;
+
+ /// If `Resources` is used up, we will fill in more resources. It assumes that
+ /// the new size `Size` should be always larger than the current size.
+ bool resize(size_t Size) {
+ auto CurSize = Resources.size();
+ assert(Size > CurSize && "Unexpected smaller size");
+ Resources.reserve(Size);
+ for (auto I = CurSize; I < Size; ++I) {
+ T NewItem;
+ int Ret = Allocator.create(NewItem);
+ if (Ret != OFFLOAD_SUCCESS)
+ return false;
+ Resources.push_back(NewItem);
}
- return StreamPool[DeviceId][Id++];
+ return true;
}
- // Return a CUstream back to pool. As mentioned above, per-device next
- // stream is always points to the next available CUstream, so when we return
- // a CUstream, we need to first decrease the id, and then copy the CUstream
- // back.
- // It is worth noting that, the order of streams return might be
diff erent
- // from that they're assigned, that saying, at some point, there might be
- // two identical CUstreams.
- // xxax+a+++++
- // ^
- // id
- // However, it doesn't matter, because they're always on the two sides of
- // id. The left one will in the end be overwritten by another CUstream.
- // Therefore, after several execution, the order of pool might be
diff erent
- // from its initial state.
- void returnStream(const int DeviceId, CUstream Stream) {
- const std::lock_guard<std::mutex> Lock(*StreamMtx[DeviceId]);
- int &Id = NextStreamId[DeviceId];
- assert(Id > 0 && "Wrong stream ID");
- StreamPool[DeviceId][--Id] = Stream;
+public:
+ ResourcePoolTy(AllocatorTy<T> &&A, size_t Size = 0) noexcept
+ : Allocator(std::move(A)) {
+ (void)resize(Size);
}
- bool initializeDeviceStreamPool(const int DeviceId) {
- assert(StreamPool[DeviceId].empty() && "stream pool has been initialized");
+ ~ResourcePoolTy() noexcept {
+ for (auto &R : Resources)
+ (void)Allocator.destroy(R);
+ }
- resizeStreamPool(DeviceId, EnvNumInitialStreams);
+ /// Get a resource from pool. `Next` always points to the next available
+ /// resource. That means, `[0, next-1]` have been assigned, and `[id,]` are
+ /// still available. If there is no resource left, we will ask for more. Each
+ /// time a resource is assigned, the id will increase one.
+ /// xxxxxs+++++++++
+ /// ^
+ /// Next
+ /// After assignment, the pool becomes the following and s is assigned.
+ /// xxxxxs+++++++++
+ /// ^
+ /// Next
+ int acquire(T &R) noexcept {
+ std::lock_guard<std::mutex> LG(Mutex);
+ if (Next == Resources.size() && !resize(Resources.size() * 2))
+ return OFFLOAD_FAIL;
- // Check the size of stream pool
- if (static_cast<int>(StreamPool[DeviceId].size()) != EnvNumInitialStreams)
- return false;
+ R = Resources[Next++];
- // Check whether each stream is valid
- for (CUstream &S : StreamPool[DeviceId])
- if (!S)
- return false;
+ return OFFLOAD_SUCCESS;
+ }
- return true;
+ /// Return the resource back to the pool. When we return a resource, we need
+ /// to first decrease `Next`, and then copy the resource back. It is worth
+ /// noting that, the order of resources return might be
diff erent from that
+ /// they're assigned, that saying, at some point, there might be two identical
+ /// resources.
+ /// xxax+a+++++
+ /// ^
+ /// Next
+ /// However, it doesn't matter, because they're always on the two sides of
+ /// `Next`. The left one will in the end be overwritten by another resource.
+ /// Therefore, after several execution, the order of pool might be
diff erent
+ /// from its initial state.
+ void release(T R) noexcept {
+ std::lock_guard<std::mutex> LG(Mutex);
+ Resources[--Next] = R;
}
};
@@ -331,13 +319,18 @@ class DeviceRTLTy {
int64_t RequiresFlags;
// Amount of dynamic shared memory to use at launch.
uint64_t DynamicMemorySize;
+ // Number of initial streams for each device.
+ int NumInitialStreams = 32;
static constexpr const int HardTeamLimit = 1U << 16U; // 64k
static constexpr const int HardThreadLimit = 1024;
static constexpr const int DefaultNumTeams = 128;
static constexpr const int DefaultNumThreads = 128;
- std::unique_ptr<StreamManagerTy> StreamManager;
+ using StreamPoolTy = ResourcePoolTy<CUstream>;
+ using StreamAllocatorTy = AllocatorTy<CUstream>;
+ std::vector<std::unique_ptr<StreamPoolTy>> StreamPool;
+
std::vector<DeviceDataTy> DeviceData;
std::vector<CUmodule> Modules;
@@ -471,8 +464,13 @@ class DeviceRTLTy {
CUstream getStream(const int DeviceId, __tgt_async_info *AsyncInfo) const {
assert(AsyncInfo && "AsyncInfo is nullptr");
- if (!AsyncInfo->Queue)
- AsyncInfo->Queue = StreamManager->getStream(DeviceId);
+ if (!AsyncInfo->Queue) {
+ CUstream S;
+ if (StreamPool[DeviceId]->acquire(S) != OFFLOAD_SUCCESS)
+ return nullptr;
+
+ AsyncInfo->Queue = S;
+ }
return reinterpret_cast<CUstream>(AsyncInfo->Queue);
}
@@ -509,6 +507,7 @@ class DeviceRTLTy {
}
DeviceData.resize(NumberOfDevices);
+ StreamPool.resize(NumberOfDevices);
// Get environment variables regarding teams
if (const char *EnvStr = getenv("OMP_TEAM_LIMIT")) {
@@ -532,9 +531,11 @@ class DeviceRTLTy {
DP("Parsed LIBOMPTARGET_SHARED_MEMORY_SIZE = %" PRIu64 "\n",
DynamicMemorySize);
}
-
- StreamManager =
- std::make_unique<StreamManagerTy>(NumberOfDevices, DeviceData);
+ if (const char *EnvStr = getenv("LIBOMPTARGET_NUM_INITIAL_STREAMS")) {
+ // LIBOMPTARGET_NUM_INITIAL_STREAMS has been set
+ NumInitialStreams = std::stoi(EnvStr);
+ DP("Parsed LIBOMPTARGET_NUM_INITIAL_STREAMS=%d\n", NumInitialStreams);
+ }
for (int I = 0; I < NumberOfDevices; ++I)
DeviceAllocators.emplace_back(I, DeviceData);
@@ -556,13 +557,14 @@ class DeviceRTLTy {
for (auto &M : MemoryManagers)
M.release();
- StreamManager = nullptr;
-
for (CUmodule &M : Modules)
// Close module
if (M)
checkResult(cuModuleUnload(M), "Error returned from cuModuleUnload\n");
+ for (auto &S : StreamPool)
+ S = nullptr;
+
for (DeviceDataTy &D : DeviceData) {
// Destroy context
if (D.Context) {
@@ -627,8 +629,9 @@ class DeviceRTLTy {
return OFFLOAD_FAIL;
// Initialize stream pool
- if (!StreamManager->initializeDeviceStreamPool(DeviceId))
- return OFFLOAD_FAIL;
+ if (!StreamPool[DeviceId])
+ StreamPool[DeviceId] = std::make_unique<StreamPoolTy>(
+ StreamAllocatorTy(DeviceData[DeviceId].Context), NumInitialStreams);
// Query attributes to determine number of threads/block and blocks/grid.
int MaxGridDimX;
@@ -1195,8 +1198,7 @@ class DeviceRTLTy {
// Once the stream is synchronized, return it to stream pool and reset
// AsyncInfo. This is to make sure the synchronization only works for its
// own tasks.
- StreamManager->returnStream(DeviceId,
- reinterpret_cast<CUstream>(AsyncInfo->Queue));
+ StreamPool[DeviceId]->release(reinterpret_cast<CUstream>(AsyncInfo->Queue));
AsyncInfo->Queue = nullptr;
if (Err != CUDA_SUCCESS) {
More information about the Openmp-commits
mailing list