[llvm] [Offload] Add MPI Plugin (PR #90890)

via llvm-commits llvm-commits at lists.llvm.org
Thu May 2 11:58:26 PDT 2024


llvmbot wrote:


<!--LLVM PR SUMMARY COMMENT-->

@llvm/pr-subscribers-offload

Author: Jhonatan Cléto (cl3to)

<details>
<summary>Changes</summary>

This patch adds a new OpenMP Target plugin that is built upon the existing PluginInterface classes, and enable the offloading of computational tasks to remote x86_64 devices using MPI. It enhances the efficiency of kernel launches and data transfers by utilizing an event-driven architecture that incorporates non-blocking MPI communications paired with C++20 coroutines, allowing for asynchronous operations.

Currently, the plugin lacks support for the following features:

- Host and unified/shared memory allocation/free operations
- Device environment operations such as indirect function calls
- Calls to libomptarget routines on remote target regions
- Most of the device features that don't work on the host plugin also don't work on the MPI Plugin

Looking ahead, future enhancements to the plugin are planned to broaden its compatibility. These enhancements will enable the offloading of tasks to remote devices across various architectures within the LLVM/Offload plugin framework, including but not limited to remote GPUs.

Currently, a program that will utilize the MPI Plugin must be compiled like a program using the X86_64 Plugin, as shown in the following example:

```sh
clang -fopenmp -fopenmp-targets=x86_64-pc-linux-gnu -o ompt_prog ompt_prog.c
```

The MPI Plugin employs a binary `llvm-offload-mpi-device` to execute target operations on the remote device. Consequently, it's necessary to run the program using the Single Process Multiple Data (SPMD) model of MPI launcher for offloading to an MPI device. The following example runs the  `ompt_prog` binary using N MPI devices:

```sh
mpirun -np N llvm-offload-mpi-device :  -np 1 ./ompt_prog
```

Only one process of the OpenMP program should be created `-np 1 ./ompt_prog`. If more than one instance is created, the plugin will not work correctly. Due to a design choice, the host process (`ompt_prog`) must have the rank of `WorldSize - 1` for MPI communication to occur correctly in the plugin. Consequently, it's essential to execute the `mpirun` command in the order shown in the previous example.

To compile the plugin and run the test suite, an environment with an installed MPI implementation is necessary. The plugin has been tested using the OpenMPI and MPICH implementations.

We don't have resources to add a new Buildbot for our plugin, so we expect current Buildbots to be updated to support our plugin.


---

Patch is 92.67 KiB, truncated to 20.00 KiB below, full version: https://github.com/llvm/llvm-project/pull/90890.diff


30 Files Affected:

- (modified) offload/CMakeLists.txt (+2-1) 
- (modified) offload/cmake/Modules/LibomptargetGetDependencies.cmake (+18) 
- (added) offload/plugins-nextgen/mpi/CMakeLists.txt (+110) 
- (added) offload/plugins-nextgen/mpi/src/EventSystem.cpp (+1049) 
- (added) offload/plugins-nextgen/mpi/src/EventSystem.h (+470) 
- (added) offload/plugins-nextgen/mpi/src/MPIDeviceMain.cpp (+11) 
- (added) offload/plugins-nextgen/mpi/src/rtl.cpp (+685) 
- (modified) offload/test/api/omp_device_managed_memory.c (+2) 
- (modified) offload/test/api/omp_device_managed_memory_alloc.c (+2) 
- (modified) offload/test/api/omp_dynamic_shared_memory.c (+1) 
- (modified) offload/test/api/omp_host_pinned_memory.c (+2) 
- (modified) offload/test/api/omp_host_pinned_memory_alloc.c (+2) 
- (modified) offload/test/api/omp_indirect_call.c (+2) 
- (modified) offload/test/jit/empty_kernel_lvl1.c (+1) 
- (modified) offload/test/jit/empty_kernel_lvl2.c (+1) 
- (modified) offload/test/jit/type_punning.c (+1) 
- (modified) offload/test/lit.cfg (+9-1) 
- (modified) offload/test/mapping/target_derefence_array_pointrs.cpp (+1) 
- (modified) offload/test/offloading/barrier_fence.c (+1) 
- (modified) offload/test/offloading/bug49334.cpp (+1) 
- (modified) offload/test/offloading/default_thread_limit.c (+1) 
- (modified) offload/test/offloading/ompx_bare.c (+1) 
- (modified) offload/test/offloading/ompx_coords.c (+1) 
- (modified) offload/test/offloading/ompx_saxpy_mixed.c (+1) 
- (modified) offload/test/offloading/small_trip_count.c (+1) 
- (modified) offload/test/offloading/small_trip_count_thread_limit.cpp (+1) 
- (modified) offload/test/offloading/spmdization.c (+1) 
- (modified) offload/test/offloading/target_critical_region.cpp (+1) 
- (modified) offload/test/offloading/thread_limit.c (+1) 
- (modified) offload/test/offloading/workshare_chunk.c (+1) 


``````````diff
diff --git a/offload/CMakeLists.txt b/offload/CMakeLists.txt
index 3f77583ffa3b85..f6d1bbdda5e9f9 100644
--- a/offload/CMakeLists.txt
+++ b/offload/CMakeLists.txt
@@ -151,7 +151,7 @@ if (NOT LIBOMPTARGET_LLVM_INCLUDE_DIRS)
   message(FATAL_ERROR "Missing definition for LIBOMPTARGET_LLVM_INCLUDE_DIRS")
 endif()
 
-set(LIBOMPTARGET_ALL_PLUGIN_TARGETS amdgpu cuda host)
+set(LIBOMPTARGET_ALL_PLUGIN_TARGETS amdgpu cuda mpi host)
 set(LIBOMPTARGET_PLUGINS_TO_BUILD "all" CACHE STRING
     "Semicolon-separated list of plugins to use: cuda, amdgpu, host or \"all\".")
 
@@ -182,6 +182,7 @@ set (LIBOMPTARGET_ALL_TARGETS "${LIBOMPTARGET_ALL_TARGETS} powerpc64-ibm-linux-g
 set (LIBOMPTARGET_ALL_TARGETS "${LIBOMPTARGET_ALL_TARGETS} powerpc64-ibm-linux-gnu-LTO")
 set (LIBOMPTARGET_ALL_TARGETS "${LIBOMPTARGET_ALL_TARGETS} x86_64-pc-linux-gnu")
 set (LIBOMPTARGET_ALL_TARGETS "${LIBOMPTARGET_ALL_TARGETS} x86_64-pc-linux-gnu-LTO")
+set (LIBOMPTARGET_ALL_TARGETS "${LIBOMPTARGET_ALL_TARGETS} x86_64-pc-linux-gnu-mpi")
 set (LIBOMPTARGET_ALL_TARGETS "${LIBOMPTARGET_ALL_TARGETS} nvptx64-nvidia-cuda")
 set (LIBOMPTARGET_ALL_TARGETS "${LIBOMPTARGET_ALL_TARGETS} nvptx64-nvidia-cuda-LTO")
 set (LIBOMPTARGET_ALL_TARGETS "${LIBOMPTARGET_ALL_TARGETS} nvptx64-nvidia-cuda-JIT-LTO")
diff --git a/offload/cmake/Modules/LibomptargetGetDependencies.cmake b/offload/cmake/Modules/LibomptargetGetDependencies.cmake
index bbf2b9836c7095..080c07b563da4c 100644
--- a/offload/cmake/Modules/LibomptargetGetDependencies.cmake
+++ b/offload/cmake/Modules/LibomptargetGetDependencies.cmake
@@ -108,3 +108,21 @@ if(LIBOMPTARGET_AMDGPU_ARCH)
 endif()
 
 set(OPENMP_PTHREAD_LIB ${LLVM_PTHREAD_LIB})
+
+################################################################################
+# Looking for MPI...
+################################################################################
+find_package(MPI QUIET)
+
+set(LIBOMPTARGET_DEP_MPI_FOUND ${MPI_CXX_FOUND})
+set(LIBOMPTARGET_DEP_MPI_LIBRARIES ${MPI_CXX_LIBRARIES})
+set(LIBOMPTARGET_DEP_MPI_INCLUDE_DIRS ${MPI_CXX_INCLUDE_DIRS})
+set(LIBOMPTARGET_DEP_MPI_COMPILE_FLAGS ${MPI_CXX_COMPILE_FLAGS})
+set(LIBOMPTARGET_DEP_MPI_LINK_FLAGS ${MPI_CXX_LINK_FLAGS})
+
+mark_as_advanced(
+  LIBOMPTARGET_DEP_MPI_FOUND
+  LIBOMPTARGET_DEP_MPI_LIBRARIES
+  LIBOMPTARGET_DEP_MPI_INCLUDE_DIRS
+  LIBOMPTARGET_DEP_MPI_COMPILE_FLAGS
+  LIBOMPTARGET_DEP_MPI_LINK_FLAGS)
diff --git a/offload/plugins-nextgen/mpi/CMakeLists.txt b/offload/plugins-nextgen/mpi/CMakeLists.txt
new file mode 100644
index 00000000000000..9fa9b9efbb22ff
--- /dev/null
+++ b/offload/plugins-nextgen/mpi/CMakeLists.txt
@@ -0,0 +1,110 @@
+##===----------------------------------------------------------------------===##
+#
+# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
+# See https://llvm.org/LICENSE.txt for license information.
+# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
+#
+##===----------------------------------------------------------------------===##
+#
+# Build a plugin for a MPI machine if available.
+#
+##===----------------------------------------------------------------------===##
+if (NOT(CMAKE_SYSTEM_PROCESSOR MATCHES "(x86_64)|(ppc64le)$" AND CMAKE_SYSTEM_NAME MATCHES "Linux"))
+  libomptarget_say("Not building MPI offloading plugin: only support MPI in Linux x86_64 or ppc64le hosts.")
+  return()
+elseif (NOT LIBOMPTARGET_DEP_LIBFFI_FOUND)
+  libomptarget_say("Not building MPI offloading plugin: libffi dependency not found.")
+  return()
+elseif(NOT LIBOMPTARGET_DEP_MPI_FOUND)
+  libomptarget_say("Not building MPI offloading plugin: MPI not found in system.")
+  return()
+endif()
+
+libomptarget_say("Building MPI NextGen offloading plugin.")
+
+# Create the library and add the default arguments.
+add_target_library(omptarget.rtl.mpi MPI)
+
+target_sources(omptarget.rtl.mpi PRIVATE
+  src/EventSystem.cpp
+  src/rtl.cpp
+)
+
+if(FFI_STATIC_LIBRARIES)
+  target_link_libraries(omptarget.rtl.mpi PRIVATE FFI::ffi_static)
+else()
+  target_link_libraries(omptarget.rtl.mpi PRIVATE FFI::ffi)
+endif()
+
+target_link_libraries(omptarget.rtl.mpi PRIVATE 
+  ${LIBOMPTARGET_DEP_MPI_LIBRARIES}
+  ${LIBOMPTARGET_DEP_MPI_LINK_FLAGS}
+)
+
+# Add include directories
+target_include_directories(omptarget.rtl.mpi PRIVATE
+                           ${LIBOMPTARGET_INCLUDE_DIR})
+
+# Install plugin under the lib destination folder.
+install(TARGETS omptarget.rtl.mpi
+        LIBRARY DESTINATION "${OFFLOAD_INSTALL_LIBDIR}")
+set_target_properties(omptarget.rtl.mpi PROPERTIES 
+  INSTALL_RPATH "$ORIGIN" BUILD_RPATH "$ORIGIN:${CMAKE_CURRENT_BINARY_DIR}/..")
+
+if(LIBOMPTARGET_DEP_MPI_COMPILE_FLAGS)
+  set_target_properties(omptarget.rtl.mpi PROPERTIES
+                        COMPILE_FLAGS "${LIBOMPTARGET_DEP_MPI_COMPILE_FLAGS}")
+endif()
+
+# Set C++20 as the target standard for this plugin.
+set_target_properties(omptarget.rtl.mpi
+                      PROPERTIES
+                      CXX_STANDARD 20
+                      CXX_STANDARD_REQUIRED ON)
+
+# Configure testing for the MPI plugin.
+list(APPEND LIBOMPTARGET_TESTED_PLUGINS "omptarget.rtl.mpi")
+# Report to the parent scope that we are building a plugin for MPI.
+set(LIBOMPTARGET_TESTED_PLUGINS "${LIBOMPTARGET_TESTED_PLUGINS}" PARENT_SCOPE)
+
+# Define the target specific triples and ELF machine values.
+set(LIBOMPTARGET_SYSTEM_TARGETS
+    "${LIBOMPTARGET_SYSTEM_TARGETS} x86_64-pc-linux-gnu-mpi" PARENT_SCOPE)
+
+# MPI Device Binary
+llvm_add_tool(OPENMP llvm-offload-mpi-device src/EventSystem.cpp src/MPIDeviceMain.cpp)
+
+llvm_update_compile_flags(llvm-offload-mpi-device)
+
+target_link_libraries(llvm-offload-mpi-device PRIVATE
+  ${LIBOMPTARGET_DEP_MPI_LIBRARIES}
+  ${LIBOMPTARGET_DEP_MPI_LINK_FLAGS}
+  LLVMSupport
+  omp
+)
+
+if(FFI_STATIC_LIBRARIES)
+  target_link_libraries(llvm-offload-mpi-device PRIVATE FFI::ffi_static)
+else()
+  target_link_libraries(llvm-offload-mpi-device PRIVATE FFI::ffi)
+endif()
+
+target_include_directories(llvm-offload-mpi-device PRIVATE
+  ${LIBOMPTARGET_INCLUDE_DIR}
+  ${LIBOMPTARGET_DEP_MPI_INCLUDE_DIRS}
+)
+
+if(LIBOMPTARGET_DEP_MPI_COMPILE_FLAGS)
+  set_target_properties(llvm-offload-mpi-device PROPERTIES
+    COMPILE_FLAGS "${LIBOMPTARGET_DEP_MPI_COMPILE_FLAGS}"
+  )
+endif()
+
+set_target_properties(llvm-offload-mpi-device
+  PROPERTIES
+  CXX_STANDARD 20
+  CXX_STANDARD_REQUIRED ON
+)
+
+target_compile_definitions(llvm-offload-mpi-device PRIVATE 
+                           DEBUG_PREFIX="OFFLOAD MPI DEVICE")
diff --git a/offload/plugins-nextgen/mpi/src/EventSystem.cpp b/offload/plugins-nextgen/mpi/src/EventSystem.cpp
new file mode 100644
index 00000000000000..3fa7d5c5b64783
--- /dev/null
+++ b/offload/plugins-nextgen/mpi/src/EventSystem.cpp
@@ -0,0 +1,1049 @@
+//===------ event_system.cpp - Concurrent MPI communication -----*- C++ -*-===//
+//
+// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
+// See https://llvm.org/LICENSE.txt for license information.
+// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
+//
+//===----------------------------------------------------------------------===//
+//
+// This file contains the implementation of the MPI Event System used by the MPI
+// target runtime for concurrent communication.
+//
+//===----------------------------------------------------------------------===//
+
+#include "EventSystem.h"
+
+#include <algorithm>
+#include <chrono>
+#include <cstddef>
+#include <cstdint>
+#include <cstdio>
+#include <cstdlib>
+#include <cstring>
+#include <functional>
+#include <memory>
+
+#include <ffi.h>
+#include <mpi.h>
+
+#include "Shared/Debug.h"
+#include "Shared/EnvironmentVar.h"
+#include "Shared/Utils.h"
+#include "omptarget.h"
+#include "llvm/ADT/SmallVector.h"
+#include "llvm/Support/Error.h"
+
+#include "llvm/Support/DynamicLibrary.h"
+
+using llvm::sys::DynamicLibrary;
+
+#define CHECK(expr, msg, ...)                                                  \
+  if (!(expr)) {                                                               \
+    REPORT(msg, ##__VA_ARGS__);                                                \
+    return false;                                                              \
+  }
+
+// Customizable parameters of the event system
+// =============================================================================
+// Number of execute event handlers to spawn.
+static IntEnvar NumExecEventHandlers("OMPTARGET_NUM_EXEC_EVENT_HANDLERS", 1);
+// Number of data event handlers to spawn.
+static IntEnvar NumDataEventHandlers("OMPTARGET_NUM_DATA_EVENT_HANDLERS", 1);
+// Polling rate period (us) used by event handlers.
+static IntEnvar EventPollingRate("OMPTARGET_EVENT_POLLING_RATE", 1);
+// Number of communicators to be spawned and distributed for the events.
+// Allows for parallel use of network resources.
+static Int64Envar NumMPIComms("OMPTARGET_NUM_MPI_COMMS", 10);
+// Maximum buffer Size to use during data transfer.
+static Int64Envar MPIFragmentSize("OMPTARGET_MPI_FRAGMENT_SIZE", 100e6);
+
+// Helper functions
+// =============================================================================
+const char *toString(EventTypeTy Type) {
+  using enum EventTypeTy;
+
+  switch (Type) {
+  case ALLOC:
+    return "Alloc";
+  case DELETE:
+    return "Delete";
+  case RETRIEVE:
+    return "Retrieve";
+  case SUBMIT:
+    return "Submit";
+  case EXCHANGE:
+    return "Exchange";
+  case EXCHANGE_SRC:
+    return "exchangeSrc";
+  case EXCHANGE_DST:
+    return "ExchangeDst";
+  case EXECUTE:
+    return "Execute";
+  case SYNC:
+    return "Sync";
+  case LOAD_BINARY:
+    return "LoadBinary";
+  case EXIT:
+    return "Exit";
+  }
+
+  assert(false && "Every enum value must be checked on the switch above.");
+  return nullptr;
+}
+
+// Coroutine events implementation
+// =============================================================================
+void EventTy::resume() {
+  // Acquire first handle not done.
+  const CoHandleTy &RootHandle = getHandle().promise().RootHandle;
+  auto &ResumableHandle = RootHandle.promise().PrevHandle;
+  while (ResumableHandle.done()) {
+    ResumableHandle = ResumableHandle.promise().PrevHandle;
+
+    if (ResumableHandle == RootHandle)
+      break;
+  }
+
+  if (!ResumableHandle.done())
+    ResumableHandle.resume();
+}
+
+void EventTy::wait() {
+  // Advance the event progress until it is completed.
+  while (!done()) {
+    resume();
+
+    std::this_thread::sleep_for(
+        std::chrono::microseconds(EventPollingRate.get()));
+  }
+}
+
+bool EventTy::done() const { return getHandle().done(); }
+
+bool EventTy::empty() const { return !getHandle(); }
+
+llvm::Error EventTy::getError() const {
+  auto &Error = getHandle().promise().CoroutineError;
+  if (Error)
+    return std::move(*Error);
+
+  return llvm::Error::success();
+}
+
+// Helpers
+// =============================================================================
+MPIRequestManagerTy::~MPIRequestManagerTy() {
+  assert(Requests.empty() && "Requests must be fulfilled and emptied before "
+                             "destruction. Did you co_await on it?");
+}
+
+void MPIRequestManagerTy::send(const void *Buffer, int Size,
+                               MPI_Datatype Datatype) {
+  MPI_Isend(Buffer, Size, Datatype, OtherRank, Tag, Comm,
+            &Requests.emplace_back(MPI_REQUEST_NULL));
+}
+
+void MPIRequestManagerTy::sendInBatchs(void *Buffer, int Size) {
+  // Operates over many fragments of the original buffer of at most
+  // MPI_FRAGMENT_SIZE bytes.
+  char *BufferByteArray = reinterpret_cast<char *>(Buffer);
+  int64_t RemainingBytes = Size;
+  while (RemainingBytes > 0) {
+    send(&BufferByteArray[Size - RemainingBytes],
+         static_cast<int>(std::min(RemainingBytes, MPIFragmentSize.get())),
+         MPI_BYTE);
+    RemainingBytes -= MPIFragmentSize.get();
+  }
+}
+
+void MPIRequestManagerTy::receive(void *Buffer, int Size,
+                                  MPI_Datatype Datatype) {
+  MPI_Irecv(Buffer, Size, Datatype, OtherRank, Tag, Comm,
+            &Requests.emplace_back(MPI_REQUEST_NULL));
+}
+
+void MPIRequestManagerTy::receiveInBatchs(void *Buffer, int Size) {
+  // Operates over many fragments of the original buffer of at most
+  // MPI_FRAGMENT_SIZE bytes.
+  char *BufferByteArray = reinterpret_cast<char *>(Buffer);
+  int64_t RemainingBytes = Size;
+  while (RemainingBytes > 0) {
+    receive(&BufferByteArray[Size - RemainingBytes],
+            static_cast<int>(std::min(RemainingBytes, MPIFragmentSize.get())),
+            MPI_BYTE);
+    RemainingBytes -= MPIFragmentSize.get();
+  }
+}
+
+EventTy MPIRequestManagerTy::wait() {
+  int RequestsCompleted = false;
+
+  while (!RequestsCompleted) {
+    int MPIError = MPI_Testall(Requests.size(), Requests.data(),
+                               &RequestsCompleted, MPI_STATUSES_IGNORE);
+
+    if (MPIError != MPI_SUCCESS)
+      co_return createError("Waiting of MPI requests failed with code %d",
+                            MPIError);
+
+    co_await std::suspend_always{};
+  }
+
+  Requests.clear();
+
+  co_return llvm::Error::success();
+}
+
+EventTy operator co_await(MPIRequestManagerTy &RequestManager) {
+  return RequestManager.wait();
+}
+
+// Device Image Storage
+// =============================================================================
+
+struct DeviceImage : __tgt_device_image {
+  llvm::SmallVector<unsigned char, 1> ImageBuffer;
+  llvm::SmallVector<__tgt_offload_entry, 16> Entries;
+  llvm::SmallVector<char> FlattenedEntryNames;
+
+  DeviceImage() {
+    ImageStart = nullptr;
+    ImageEnd = nullptr;
+    EntriesBegin = nullptr;
+    EntriesEnd = nullptr;
+  }
+
+  DeviceImage(size_t ImageSize, size_t EntryCount)
+      : ImageBuffer(ImageSize + alignof(void *)), Entries(EntryCount) {
+    // Align the image buffer to alignof(void *).
+    ImageStart = ImageBuffer.begin();
+    std::align(alignof(void *), ImageSize, ImageStart, ImageSize);
+    ImageEnd = (void *)((size_t)ImageStart + ImageSize);
+  }
+
+  void setImageEntries(llvm::SmallVector<size_t> EntryNameSizes) {
+    // Adjust the entry names to use the flattened name buffer.
+    size_t EntryCount = Entries.size();
+    size_t TotalNameSize = 0;
+    for (size_t I = 0; I < EntryCount; I++) {
+      TotalNameSize += EntryNameSizes[I];
+    }
+    FlattenedEntryNames.resize(TotalNameSize);
+
+    for (size_t I = EntryCount; I > 0; I--) {
+      TotalNameSize -= EntryNameSizes[I - 1];
+      Entries[I - 1].name = &FlattenedEntryNames[TotalNameSize];
+    }
+
+    // Set the entries pointers.
+    EntriesBegin = Entries.begin();
+    EntriesEnd = Entries.end();
+  }
+
+  /// Get the image size.
+  size_t getSize() const {
+    return llvm::omp::target::getPtrDiff(ImageEnd, ImageStart);
+  }
+
+  /// Getter and setter for the dynamic library.
+  DynamicLibrary &getDynamicLibrary() { return DynLib; }
+  void setDynamicLibrary(const DynamicLibrary &Lib) { DynLib = Lib; }
+
+private:
+  DynamicLibrary DynLib;
+};
+
+// Event Implementations
+// =============================================================================
+
+namespace OriginEvents {
+
+EventTy allocateBuffer(MPIRequestManagerTy RequestManager, int64_t Size,
+                       void **Buffer) {
+  RequestManager.send(&Size, 1, MPI_INT64_T);
+
+  RequestManager.receive(Buffer, sizeof(void *), MPI_BYTE);
+
+  co_return (co_await RequestManager);
+}
+
+EventTy deleteBuffer(MPIRequestManagerTy RequestManager, void *Buffer) {
+  RequestManager.send(&Buffer, sizeof(void *), MPI_BYTE);
+
+  // Event completion notification
+  RequestManager.receive(nullptr, 0, MPI_BYTE);
+
+  co_return (co_await RequestManager);
+}
+
+EventTy submit(MPIRequestManagerTy RequestManager, EventDataHandleTy DataHandle,
+               void *DstBuffer, int64_t Size) {
+  RequestManager.send(&DstBuffer, sizeof(void *), MPI_BYTE);
+  RequestManager.send(&Size, 1, MPI_INT64_T);
+
+  RequestManager.sendInBatchs(DataHandle.get(), Size);
+
+  // Event completion notification
+  RequestManager.receive(nullptr, 0, MPI_BYTE);
+
+  co_return (co_await RequestManager);
+}
+
+EventTy retrieve(MPIRequestManagerTy RequestManager, void *OrgBuffer,
+                 const void *DstBuffer, int64_t Size) {
+  RequestManager.send(&DstBuffer, sizeof(void *), MPI_BYTE);
+  RequestManager.send(&Size, 1, MPI_INT64_T);
+  RequestManager.receiveInBatchs(OrgBuffer, Size);
+
+  // Event completion notification
+  RequestManager.receive(nullptr, 0, MPI_BYTE);
+
+  co_return (co_await RequestManager);
+}
+
+EventTy exchange(MPIRequestManagerTy RequestManager, int SrcDevice,
+                 const void *OrgBuffer, int DstDevice, void *DstBuffer,
+                 int64_t Size) {
+  // Send data to SrcDevice
+  RequestManager.send(&OrgBuffer, sizeof(void *), MPI_BYTE);
+  RequestManager.send(&Size, 1, MPI_INT64_T);
+  RequestManager.send(&DstDevice, 1, MPI_INT);
+
+  // Send data to DstDevice
+  RequestManager.OtherRank = DstDevice;
+  RequestManager.send(&DstBuffer, sizeof(void *), MPI_BYTE);
+  RequestManager.send(&Size, 1, MPI_INT64_T);
+  RequestManager.send(&SrcDevice, 1, MPI_INT);
+
+  // Event completion notification
+  RequestManager.receive(nullptr, 0, MPI_BYTE);
+  RequestManager.OtherRank = SrcDevice;
+  RequestManager.receive(nullptr, 0, MPI_BYTE);
+
+  co_return (co_await RequestManager);
+}
+
+EventTy execute(MPIRequestManagerTy RequestManager, EventDataHandleTy Args,
+                uint32_t NumArgs, void *Func) {
+  RequestManager.send(&NumArgs, 1, MPI_UINT32_T);
+  RequestManager.send(Args.get(), NumArgs * sizeof(void *), MPI_BYTE);
+  RequestManager.send(&Func, sizeof(void *), MPI_BYTE);
+
+  // Event completion notification
+  RequestManager.receive(nullptr, 0, MPI_BYTE);
+  co_return (co_await RequestManager);
+}
+
+EventTy sync(EventTy Event) {
+  while (!Event.done())
+    co_await std::suspend_always{};
+
+  co_return llvm::Error::success();
+}
+
+EventTy loadBinary(MPIRequestManagerTy RequestManager,
+                   const __tgt_device_image *Image,
+                   llvm::SmallVector<void *> *DeviceImageAddrs) {
+  auto &[ImageStart, ImageEnd, EntriesBegin, EntriesEnd] = *Image;
+
+  // Send the target table sizes.
+  size_t ImageSize = (size_t)ImageEnd - (size_t)ImageStart;
+  size_t EntryCount = EntriesEnd - EntriesBegin;
+  llvm::SmallVector<size_t> EntryNameSizes(EntryCount);
+
+  for (size_t I = 0; I < EntryCount; I++) {
+    // Note: +1 for the terminator.
+    EntryNameSizes[I] = std::strlen(EntriesBegin[I].name) + 1;
+  }
+
+  RequestManager.send(&ImageSize, 1, MPI_UINT64_T);
+  RequestManager.send(&EntryCount, 1, MPI_UINT64_T);
+  RequestManager.send(EntryNameSizes.begin(), EntryCount, MPI_UINT64_T);
+
+  // Send the image bytes and the table entries.
+  RequestManager.send(ImageStart, ImageSize, MPI_BYTE);
+
+  for (size_t I = 0; I < EntryCount; I++) {
+    RequestManager.send(&EntriesBegin[I].addr, 1, MPI_UINT64_T);
+    RequestManager.send(EntriesBegin[I].name, EntryNameSizes[I], MPI_CHAR);
+    RequestManager.send(&EntriesBegin[I].size, 1, MPI_UINT64_T);
+    RequestManager.send(&EntriesBegin[I].flags, 1, MPI_INT32_T);
+    RequestManager.send(&EntriesBegin[I].data, 1, MPI_INT32_T);
+  }
+
+  for (size_t I = 0; I < EntryCount; I++) {
+    RequestManager.receive(&((*DeviceImageAddrs)[I]), 1, MPI_UINT64_T);
+  }
+
+  co_return (co_await RequestManager);
+}
+
+EventTy exit(MPIRequestManagerTy RequestManager) {
+  // Event completion notification
+  RequestManager.receive(nullptr, 0, MPI_BYTE);
+  co_return (co_await RequestManager);
+}
+
+} // namespace OriginEvents
+
+namespace DestinationEvents {
+
+EventTy allocateBuffer(MPIRequestManagerTy RequestManager) {
+  int64_t Size = 0;
+  RequestManager.receive(&Size, 1, MPI_INT64_T);
+
+  if (auto Error = co_await RequestManager; Error)
+    co_return Error;
+
+  void *Buffer = malloc(Size);
+  RequestManager.send(&Buffer, sizeof(void *), MPI_BYTE);
+
+  co_return (co_await RequestManager);
+}
+
+EventTy deleteBuffer(MPIRequestManagerTy RequestManager) {
+  void *Buffer = nullptr;
+  RequestManager.receive(&Buffer, sizeof(void *), MPI_BYTE);
+
+  if (auto Error = co_await RequestManager; Error)
+    co_return Error;
+
+  free(Buffer);
+
+  // Event completion notification
+  RequestManager.send(nullptr, 0, MPI_BYTE);
+
+  co_return (co_await RequestManager);
+}
+
+EventTy submit(MPIRequestManagerTy RequestManager) {
+  void *Buffer = nullptr;
+  int64_t Size = 0;
+  RequestManager.receive(...
[truncated]

``````````

</details>


https://github.com/llvm/llvm-project/pull/90890


More information about the llvm-commits mailing list