[Openmp-commits] [openmp] feed674 - [OpenMP] Introduce stream pool to make sure the correctness of device synchr...

Shilei Tian via Openmp-commits openmp-commits at lists.llvm.org
Sat Apr 11 04:14:38 PDT 2020


Author: Shilei Tian
Date: 2020-04-11T07:08:56-04:00
New Revision: feed674deca13fc66ee19fce9d555d1cd91535ca

URL: https://github.com/llvm/llvm-project/commit/feed674deca13fc66ee19fce9d555d1cd91535ca
DIFF: https://github.com/llvm/llvm-project/commit/feed674deca13fc66ee19fce9d555d1cd91535ca.diff

LOG: [OpenMP] Introduce stream pool to make sure the correctness of device synchr...
...onization

Summary: In previous patch, in order to optimize performance, we only synchronize once
for each target region. The syncrhonization is via stream synchronization.
However, in the extreme situation, the performce might be bad. Consider the
following case: There is a task that requires transferring huge amount of data
(call many times of data transferring function). It is scheduled to the first
stream. And then we have 255 very light tasks scheduled to the remaining 255
streams (by default we have 256 streams). They can be finished before we do
synchronization at the end of the first task. Next, we get another very huge
task. It will be scheduled again to the first stream. Now the first task
finishes its kernel launch and call stream synchronization. Right now, the
stream already contains two kernels, and the synchronization will wait until the
two kernels finish instead of just the first one for the first task.

In this patch, we introduce stream pool. After each synchronization, the stream
will be returned back to the pool to make sure that for each synchronization,
only expected operations are waited.

Reviewers: jdoerfert

Reviewed By: jdoerfert

Subscribers: gregrodgers, yaxunl, lildmh, guansong, openmp-commits

Tags: #openmp

Differential Revision: https://reviews.llvm.org/D77412

Added: 
    

Modified: 
    openmp/libomptarget/plugins/cuda/src/rtl.cpp

Removed: 
    


################################################################################
diff  --git a/openmp/libomptarget/plugins/cuda/src/rtl.cpp b/openmp/libomptarget/plugins/cuda/src/rtl.cpp
index 1147f821b7ae..4c38ee8c2d28 100644
--- a/openmp/libomptarget/plugins/cuda/src/rtl.cpp
+++ b/openmp/libomptarget/plugins/cuda/src/rtl.cpp
@@ -16,6 +16,7 @@
 #include <cuda.h>
 #include <list>
 #include <memory>
+#include <mutex>
 #include <string>
 #include <vector>
 
@@ -80,7 +81,8 @@ struct KernelTy {
 };
 
 /// Device environment data
-/// Manually sync with the deviceRTL side for now, move to a dedicated header file later.
+/// Manually sync with the deviceRTL side for now, move to a dedicated header
+/// file later.
 struct omptarget_device_environmentTy {
   int32_t debug_level;
 };
@@ -89,16 +91,164 @@ struct omptarget_device_environmentTy {
 /// FIXME: we may need this to be per device and per library.
 std::list<KernelTy> KernelsList;
 
+namespace {
+bool checkResult(CUresult Err, const char *ErrMsg) {
+  if (Err == CUDA_SUCCESS)
+    return true;
+
+  DP(ErrMsg);
+  CUDA_ERR_STRING(Err);
+  return false;
+}
+} // namespace
+
+class StreamManagerTy {
+  int NumberOfDevices;
+  // Per-device stream mutex
+  std::vector<std::unique_ptr<std::mutex>> StreamMtx;
+  // Per-device stream Id indicates the next available stream in the pool
+  std::vector<int> NextStreamId;
+  // Per-device stream pool
+  std::vector<std::vector<CUstream>> StreamPool;
+  // Pointer to per-device context
+  std::vector<CUcontext> &ContextsPtr;
+
+  // If there is no CUstream left in the pool, we will resize the pool to
+  // allocate more CUstream. This function should be called with device mutex,
+  // and we do not resize to smaller one.
+  void resizeStreamPool(const int DeviceId, const size_t NewSize) {
+    std::vector<CUstream> &Pool = StreamPool[DeviceId];
+    const size_t CurrentSize = Pool.size();
+    assert(NewSize > CurrentSize && "new size is not larger than current size");
+
+    Pool.resize(NewSize, nullptr);
+
+    CUresult err = cuCtxSetCurrent(ContextsPtr[DeviceId]);
+    if (!checkResult(err, "Error when setting current CUDA context\n")) {
+      // We will return if cannot switch to the right context in case of
+      // creating bunch of streams that are not corresponding to the right
+      // device. The offloading will fail later because selected CUstream is
+      // nullptr.
+      return;
+    }
+
+    for (size_t I = CurrentSize; I < NewSize; ++I) {
+      err = cuStreamCreate(&Pool[I], CU_STREAM_NON_BLOCKING);
+      checkResult(err,
+                  "Error when creating CUDA stream to resize stream pool\n");
+    }
+  }
+
+public:
+  StreamManagerTy(const int NumberOfDevices, std::vector<CUcontext> &CtxPtr)
+      : NumberOfDevices(NumberOfDevices), ContextsPtr(CtxPtr) {
+    StreamPool.resize(NumberOfDevices);
+    NextStreamId.resize(NumberOfDevices);
+    StreamMtx.resize(NumberOfDevices);
+
+    // Initially let's create 32 streams for each device
+    int EnvNumInitialStreams = 32;
+    char *envStr = getenv("LIBOMPTARGET_NUM_INITIAL_STREAMS");
+    if (envStr)
+      EnvNumInitialStreams = std::stoi(envStr);
+
+    // Initialize the stream pool for each device
+    for (std::vector<CUstream> &S : StreamPool)
+      S.resize(EnvNumInitialStreams);
+
+    // Initialize the next stream id
+    std::fill(NextStreamId.begin(), NextStreamId.end(), 0);
+
+    // Initialize stream mutex
+    for (std::unique_ptr<std::mutex> &Ptr : StreamMtx)
+      Ptr = std::make_unique<std::mutex>();
+  }
+
+  ~StreamManagerTy() {
+    // Destroy streams
+    for (int I = 0; I < NumberOfDevices; ++I) {
+      CUresult err = cuCtxSetCurrent(ContextsPtr[I]);
+      checkResult(err, "Error when setting current CUDA context\n");
+
+      for (CUstream &S : StreamPool[I]) {
+        if (!S)
+          continue;
+        err = cuStreamDestroy(S);
+        checkResult(err, "Error when destroying CUDA stream\n");
+      }
+    }
+  }
+
+  // Get a CUstream from pool. Per-device next stream id always points to the
+  // next available CUstream. That means, CUstreams [0, id-1] have been
+  // assigned, and [id,] are still available. If there is no CUstream left, we
+  // will ask more CUstreams from CUDA RT. Each time a CUstream is assigned,
+  // the id will increase one.
+  // xxxxxs+++++++++
+  //      ^
+  //      id
+  // After assignment, the pool becomes the following and s is assigned.
+  // xxxxxs+++++++++
+  //       ^
+  //       id
+  CUstream getStream(const int DeviceId) {
+    assert(DeviceId >= 0 &&
+           static_cast<size_t>(DeviceId) < NextStreamId.size() &&
+           "Unexpected device id");
+
+    const std::lock_guard<std::mutex> Lock(*StreamMtx[DeviceId]);
+    int &Id = NextStreamId[DeviceId];
+    // No CUstream left in the pool, we need to request from CUDA RT
+    if (Id == StreamPool[DeviceId].size()) {
+      // By default we double the stream pool every time
+      resizeStreamPool(DeviceId, Id * 2);
+    }
+    return StreamPool[DeviceId][Id++];
+  }
+
+  // Return a CUstream back to pool. As mentioned above, per-device next
+  // stream is always points to the next available CUstream, so when we return
+  // a CUstream, we need to first decrease the id, and then copy the CUstream
+  // back.
+  // It is worth noting that, the order of streams return might be 
diff erent
+  // from that they're assigned, that saying, at some point, there might be
+  // two identical CUstreams.
+  // xxax+a+++++
+  //     ^
+  //     id
+  // However, it doesn't matter, because they're always on the two sides of
+  // id. The left one will in the end be overwritten by another CUstream.
+  // Therefore, after several execution, the order of pool might be 
diff erent
+  // from its initial state.
+  void returnStream(const int DeviceId, CUstream Stream) {
+    assert(DeviceId >= 0 &&
+           static_cast<size_t>(DeviceId) < NextStreamId.size() &&
+           "Unexpected device id");
+
+    const std::lock_guard<std::mutex> Lock(*StreamMtx[DeviceId]);
+    int &Id = NextStreamId[DeviceId];
+    assert(Id > 0 && "Wrong stream ID");
+    StreamPool[DeviceId][--Id] = Stream;
+  }
+
+  void initializeDevice(int DeviceId) {
+    // This function should be called after setting right context
+    for (CUstream &Stream : StreamPool[DeviceId]) {
+      CUresult Err = cuStreamCreate(&Stream, CU_STREAM_NON_BLOCKING);
+      checkResult(Err, "Error when creating CUDA stream\n");
+    }
+  }
+};
+
 /// Class containing all the device information.
 class RTLDeviceInfoTy {
   std::vector<std::list<FuncOrGblEntryTy>> FuncGblEntries;
-  std::vector<std::unique_ptr<std::atomic_uint>> NextStreamId;
+  std::shared_ptr<StreamManagerTy> StreamManager;
 
 public:
   int NumberOfDevices;
   std::vector<CUmodule> Modules;
   std::vector<CUcontext> Contexts;
-  std::vector<std::vector<CUstream>> Streams;
 
   // Device properties
   std::vector<int> ThreadsPerBlock;
@@ -112,17 +262,31 @@ class RTLDeviceInfoTy {
   // OpenMP Environment properties
   int EnvNumTeams;
   int EnvTeamLimit;
-  int EnvNumStreams;
 
   // OpenMP Requires Flags
   int64_t RequiresFlags;
 
-  //static int EnvNumThreads;
-  static const int HardTeamLimit = 1<<16; // 64k
+  // static int EnvNumThreads;
+  static const int HardTeamLimit = 1 << 16; // 64k
   static const int HardThreadLimit = 1024;
   static const int DefaultNumTeams = 128;
   static const int DefaultNumThreads = 128;
 
+  std::shared_ptr<StreamManagerTy> getStreamManager() { return StreamManager; }
+
+  CUstream getStream(const int DeviceId) {
+    return StreamManager->getStream(DeviceId);
+  }
+
+  void returnStream(const int DeviceId, __tgt_async_info *AsyncInfoPtr) {
+    assert(AsyncInfoPtr && "AsyncInfoPtr is nullptr");
+    assert(AsyncInfoPtr->Queue && "AsyncInfoPtr->Queue is nullptr");
+
+    StreamManager->returnStream(
+        DeviceId, reinterpret_cast<CUstream>(AsyncInfoPtr->Queue));
+    AsyncInfoPtr->Queue = nullptr;
+  }
+
   // Record entry point associated with device
   void addOffloadEntry(int32_t device_id, __tgt_offload_entry entry) {
     assert(device_id < (int32_t)FuncGblEntries.size() &&
@@ -178,15 +342,6 @@ class RTLDeviceInfoTy {
     E.Table.EntriesBegin = E.Table.EntriesEnd = 0;
   }
 
-  // Get the next stream on a given device in a round robin manner
-  CUstream &getNextStream(const int DeviceId) {
-    assert(DeviceId >= 0 &&
-           static_cast<size_t>(DeviceId) < NextStreamId.size() &&
-           "Unexpected device id!");
-    const unsigned int Id = NextStreamId[DeviceId]->fetch_add(1);
-    return Streams[DeviceId][Id % EnvNumStreams];
-  }
-
   RTLDeviceInfoTy() {
 #ifdef OMPTARGET_DEBUG
     if (char *envStr = getenv("LIBOMPTARGET_DEBUG")) {
@@ -219,8 +374,6 @@ class RTLDeviceInfoTy {
 
     FuncGblEntries.resize(NumberOfDevices);
     Contexts.resize(NumberOfDevices);
-    Streams.resize(NumberOfDevices);
-    NextStreamId.resize(NumberOfDevices);
     ThreadsPerBlock.resize(NumberOfDevices);
     BlocksPerGrid.resize(NumberOfDevices);
     WarpSize.resize(NumberOfDevices);
@@ -245,28 +398,17 @@ class RTLDeviceInfoTy {
       EnvNumTeams = -1;
     }
 
-    // By default let's create 256 streams per device
-    EnvNumStreams = 256;
-    envStr = getenv("LIBOMPTARGET_NUM_STREAMS");
-    if (envStr) {
-      EnvNumStreams = std::stoi(envStr);
-    }
-
-    // Initialize streams for each device
-    for (std::vector<CUstream> &S : Streams) {
-      S.resize(EnvNumStreams);
-    }
-
-    // Initialize the next stream id
-    for (std::unique_ptr<std::atomic_uint> &Ptr : NextStreamId) {
-      Ptr = std::make_unique<std::atomic_uint>(0);
-    }
+    StreamManager =
+        std::make_shared<StreamManagerTy>(NumberOfDevices, Contexts);
 
     // Default state.
     RequiresFlags = OMP_REQ_UNDEFINED;
   }
 
   ~RTLDeviceInfoTy() {
+    // First destruct stream manager in case of Contexts is destructed before it
+    StreamManager = nullptr;
+
     // Close modules
     for (auto &module : Modules)
       if (module) {
@@ -277,24 +419,6 @@ class RTLDeviceInfoTy {
         }
       }
 
-    // Destroy streams before contexts
-    for (int I = 0; I < NumberOfDevices; ++I) {
-      CUresult err = cuCtxSetCurrent(Contexts[I]);
-      if (err != CUDA_SUCCESS) {
-        DP("Error when setting current CUDA context\n");
-        CUDA_ERR_STRING(err);
-      }
-
-      for (auto &S : Streams[I])
-        if (S) {
-          err = cuStreamDestroy(S);
-          if (err != CUDA_SUCCESS) {
-            DP("Error when destroying CUDA stream\n");
-            CUDA_ERR_STRING(err);
-          }
-        }
-    }
-
     // Destroy contexts
     for (auto &ctx : Contexts)
       if (ctx) {
@@ -310,14 +434,13 @@ class RTLDeviceInfoTy {
 static RTLDeviceInfoTy DeviceInfo;
 
 namespace {
-CUstream selectStream(int32_t Id, __tgt_async_info *AsyncInfo) {
-  if (!AsyncInfo)
-    return DeviceInfo.getNextStream(Id);
+CUstream getStream(int32_t DeviceId, __tgt_async_info *AsyncInfoPtr) {
+  assert(AsyncInfoPtr && "AsyncInfoPtr is nullptr");
 
-  if (!AsyncInfo->Queue)
-    AsyncInfo->Queue = DeviceInfo.getNextStream(Id);
+  if (!AsyncInfoPtr->Queue)
+    AsyncInfoPtr->Queue = DeviceInfo.getStream(DeviceId);
 
-  return reinterpret_cast<CUstream>(AsyncInfo->Queue);
+  return reinterpret_cast<CUstream>(AsyncInfoPtr->Queue);
 }
 
 int32_t dataRetrieve(int32_t DeviceId, void *HstPtr, void *TgtPtr, int64_t Size,
@@ -331,7 +454,7 @@ int32_t dataRetrieve(int32_t DeviceId, void *HstPtr, void *TgtPtr, int64_t Size,
     return OFFLOAD_FAIL;
   }
 
-  CUstream Stream = selectStream(DeviceId, AsyncInfoPtr);
+  CUstream Stream = getStream(DeviceId, AsyncInfoPtr);
 
   err = cuMemcpyDtoHAsync(HstPtr, (CUdeviceptr)TgtPtr, Size, Stream);
   if (err != CUDA_SUCCESS) {
@@ -356,7 +479,7 @@ int32_t dataSubmit(int32_t DeviceId, void *TgtPtr, void *HstPtr, int64_t Size,
     return OFFLOAD_FAIL;
   }
 
-  CUstream Stream = selectStream(DeviceId, AsyncInfoPtr);
+  CUstream Stream = getStream(DeviceId, AsyncInfoPtr);
 
   err = cuMemcpyHtoDAsync((CUdeviceptr)TgtPtr, HstPtr, Size, Stream);
   if (err != CUDA_SUCCESS) {
@@ -413,13 +536,8 @@ int32_t __tgt_rtl_init_device(int32_t device_id) {
     CUDA_ERR_STRING(err);
   }
 
-  for (CUstream &Stream : DeviceInfo.Streams[device_id]) {
-    err = cuStreamCreate(&Stream, CU_STREAM_NON_BLOCKING);
-    if (err != CUDA_SUCCESS) {
-      DP("Error when creating CUDA stream\n");
-      CUDA_ERR_STRING(err);
-    }
-  }
+  // Initialize stream pool
+  DeviceInfo.getStreamManager()->initializeDevice(device_id);
 
   // Query attributes to determine number of threads/block and blocks/grid.
   int maxGridDimX;
@@ -894,7 +1012,7 @@ int32_t __tgt_rtl_run_target_team_region_async(
   DP("Launch kernel with %d blocks and %d threads\n", cudaBlocksPerGrid,
      cudaThreadsPerBlock);
 
-  CUstream Stream = selectStream(device_id, async_info);
+  CUstream Stream = getStream(device_id, async_info);
   err = cuLaunchKernel(KernelInfo->Func, cudaBlocksPerGrid, 1, 1,
                        cudaThreadsPerBlock, 1, 1, 0 /*bytes of shared memory*/,
                        Stream, &args[0], 0);
@@ -948,6 +1066,12 @@ int32_t __tgt_rtl_synchronize(int32_t device_id, __tgt_async_info *async_info) {
     CUDA_ERR_STRING(Err);
     return OFFLOAD_FAIL;
   }
+
+  // Once the stream is synchronized, return it to stream pool and reset
+  // async_info. This is to make sure the synchronization only works for its own
+  // tasks.
+  DeviceInfo.returnStream(device_id, async_info);
+
   return OFFLOAD_SUCCESS;
 }
 


        


More information about the Openmp-commits mailing list