[libc-commits] [libc] aea866c - [libc] Support concurrent RPC port access on the GPU

Joseph Huber via libc-commits libc-commits at lists.llvm.org
Fri May 5 08:12:27 PDT 2023


Author: Joseph Huber
Date: 2023-05-05T10:12:19-05:00
New Revision: aea866c12cb428eb5fe062ffa910a63daff62b01

URL: https://github.com/llvm/llvm-project/commit/aea866c12cb428eb5fe062ffa910a63daff62b01
DIFF: https://github.com/llvm/llvm-project/commit/aea866c12cb428eb5fe062ffa910a63daff62b01.diff

LOG: [libc] Support concurrent RPC port access on the GPU

Previously we used a single port to implement the RPC. This was
sufficient for single threaded tests but can potentially cause deadlocks
when using multiple threads. The reason for this is that GPUs make no
forward progress guarantees. Therefore one group of threads waiting on
another group of threads can spin forever because there is no guarantee
that the other threads will continue executing. The typical workaround
for this is to allocate enough memory that a sufficiently large number
of work groups can make progress. As long as this number is somewhat
close to the amount of total concurrency we can obtain reliable
execution around a shared resource.

This patch enables using multiple ports by widening the arrays to a
predetermined size and indexes into them. Empty ports are currently
obtained via a trivial linker scan. This should be imporoved in the
future for performance reasons. Portions of D148191 were applied to
achieve parallel support.

Depends on D149581

Reviewed By: JonChesterfield

Differential Revision: https://reviews.llvm.org/D149598

Added: 
    

Modified: 
    libc/src/__support/RPC/rpc.h
    libc/src/__support/RPC/rpc_util.h
    libc/startup/gpu/amdgpu/start.cpp
    libc/startup/gpu/nvptx/start.cpp
    libc/utils/gpu/loader/Server.h
    libc/utils/gpu/loader/amdgpu/Loader.cpp
    libc/utils/gpu/loader/nvptx/Loader.cpp

Removed: 
    


################################################################################
diff  --git a/libc/src/__support/RPC/rpc.h b/libc/src/__support/RPC/rpc.h
index 170f74830b141..5395c4e573328 100644
--- a/libc/src/__support/RPC/rpc.h
+++ b/libc/src/__support/RPC/rpc.h
@@ -69,6 +69,12 @@ struct alignas(64) Packet {
   Payload payload;
 };
 
+// TODO: This should be configured by the server and passed in. The general rule
+//       of thumb is that you should have at least as many ports as possible
+//       concurrent work items on the GPU to mitigate the lack offorward
+//       progress guarantees on the GPU.
+constexpr uint64_t default_port_count = 64;
+
 /// A common process used to synchronize communication between a client and a
 /// server. The process contains an inbox and an outbox used for signaling
 /// ownership of the shared buffer between both sides.
@@ -96,22 +102,31 @@ template <bool InvertInbox> struct Process {
   LIBC_INLINE Process &operator=(const Process &) = default;
   LIBC_INLINE ~Process() = default;
 
+  uint64_t port_count;
   uint32_t lane_size;
   cpp::Atomic<uint32_t> *lock;
   cpp::Atomic<uint32_t> *inbox;
   cpp::Atomic<uint32_t> *outbox;
-  Packet *buffer;
+  Packet *packet;
 
   /// Initialize the communication channels.
-  LIBC_INLINE void reset(uint32_t lane_size, void *lock, void *inbox,
-                         void *outbox, void *buffer) {
-    *this = {
-        lane_size,
-        reinterpret_cast<cpp::Atomic<uint32_t> *>(lock),
-        reinterpret_cast<cpp::Atomic<uint32_t> *>(inbox),
-        reinterpret_cast<cpp::Atomic<uint32_t> *>(outbox),
-        reinterpret_cast<Packet *>(buffer),
-    };
+  LIBC_INLINE void reset(uint64_t port_count, uint32_t lane_size, void *lock,
+                         void *inbox, void *outbox, void *packet) {
+    *this = {port_count,
+             lane_size,
+             reinterpret_cast<cpp::Atomic<uint32_t> *>(lock),
+             reinterpret_cast<cpp::Atomic<uint32_t> *>(inbox),
+             reinterpret_cast<cpp::Atomic<uint32_t> *>(outbox),
+             reinterpret_cast<Packet *>(packet)};
+  }
+
+  /// The length of the packet is flexible because the server needs to look up
+  /// the lane size at runtime. This helper indexes at the proper offset.
+  LIBC_INLINE Packet &get_packet(uint64_t index) {
+    return *reinterpret_cast<Packet *>(
+        reinterpret_cast<uint8_t *>(packet) +
+        index * align_up(sizeof(Header) + lane_size * sizeof(Buffer),
+                         alignof(Packet)));
   }
 
   /// Inverting the bits loaded from the inbox in exactly one of the pair of
@@ -190,25 +205,25 @@ template <bool InvertInbox> struct Process {
 
   /// Invokes a function accross every active buffer across the total lane size.
   LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *)> fn,
-                              uint32_t index) {
+                              Packet &packet) {
     if constexpr (is_process_gpu()) {
-      fn(&buffer[index].payload.slot[gpu::get_lane_id()]);
+      fn(&packet.payload.slot[gpu::get_lane_id()]);
     } else {
       for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size())
-        if (buffer[index].header.mask & 1ul << i)
-          fn(&buffer[index].payload.slot[i]);
+        if (packet.header.mask & 1ul << i)
+          fn(&packet.payload.slot[i]);
     }
   }
 
   /// Alternate version that also provides the index of the current lane.
   LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *, uint32_t)> fn,
-                              uint32_t index) {
+                              Packet &packet) {
     if constexpr (is_process_gpu()) {
-      fn(&buffer[index].payload.slot[gpu::get_lane_id()], gpu::get_lane_id());
+      fn(&packet.payload.slot[gpu::get_lane_id()], gpu::get_lane_id());
     } else {
       for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size())
-        if (buffer[index].header.mask & 1ul << i)
-          fn(&buffer[index].payload.slot[i], i);
+        if (packet.header.mask & 1ul << i)
+          fn(&packet.payload.slot[i], i);
     }
   }
 };
@@ -234,7 +249,7 @@ template <bool T> struct Port {
   template <typename A> LIBC_INLINE void recv_n(A alloc);
 
   LIBC_INLINE uint16_t get_opcode() const {
-    return process.buffer[index].header.opcode;
+    return process.get_packet(index).header.opcode;
   }
 
   LIBC_INLINE void close() { process.unlock(lane_mask, index); }
@@ -281,7 +296,7 @@ template <bool T> template <typename F> LIBC_INLINE void Port<T>::send(F fill) {
   }
 
   // Apply the \p fill function to initialize the buffer and release the memory.
-  process.invoke_rpc(fill, index);
+  process.invoke_rpc(fill, process.get_packet(index));
   out = !out;
   atomic_thread_fence(cpp::MemoryOrder::RELEASE);
   process.outbox[index].store(out, cpp::MemoryOrder::RELAXED);
@@ -299,7 +314,7 @@ template <bool T> template <typename U> LIBC_INLINE void Port<T>::recv(U use) {
   atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
 
   // Apply the \p use function to read the memory out of the buffer.
-  process.invoke_rpc(use, index);
+  process.invoke_rpc(use, process.get_packet(index));
   out = !out;
   process.outbox[index].store(out, cpp::MemoryOrder::RELAXED);
 }
@@ -340,7 +355,7 @@ LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) {
       inline_memcpy(buffer->data, ptr + idx, len);
     });
   }
-  gpu::sync_lane(process.buffer[index].header.mask);
+  gpu::sync_lane(process.get_packet(index).header.mask);
 }
 
 /// Receives an arbitrarily sized data buffer across the shared channel in
@@ -396,32 +411,34 @@ LIBC_INLINE void Port<T>::recv_n(A alloc) {
 /// participating thread.
 [[clang::convergent]] LIBC_INLINE cpp::optional<Client::Port>
 Client::try_open(uint16_t opcode) {
-  constexpr uint64_t index = 0;
-  const uint64_t lane_mask = gpu::get_lane_mask();
-
-  // Attempt to acquire the lock on this index.
-  if (!try_lock(lane_mask, index))
-    return cpp::nullopt;
-
-  // The mailbox state must be read with the lock held.
-  atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
-
-  uint32_t in = load_inbox(index);
-  uint32_t out = outbox[index].load(cpp::MemoryOrder::RELAXED);
-
-  // Once we acquire the index we need to check if we are in a valid sending
-  // state.
-  if (buffer_unavailable(in, out)) {
-    unlock(lane_mask, index);
-    return cpp::nullopt;
-  }
+  // Perform a naive linear scan for a port that can be opened to send data.
+  for (uint64_t index = 0; index < port_count; ++index) {
+    // Attempt to acquire the lock on this index.
+    uint64_t lane_mask = gpu::get_lane_mask();
+    if (!try_lock(lane_mask, index))
+      continue;
+
+    // The mailbox state must be read with the lock held.
+    atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
+
+    uint32_t in = load_inbox(index);
+    uint32_t out = outbox[index].load(cpp::MemoryOrder::RELAXED);
+
+    // Once we acquire the index we need to check if we are in a valid sending
+    // state.
+    if (buffer_unavailable(in, out)) {
+      unlock(lane_mask, index);
+      continue;
+    }
 
-  if (is_first_lane(lane_mask)) {
-    buffer[index].header.opcode = opcode;
-    buffer[index].header.mask = lane_mask;
+    if (is_first_lane(lane_mask)) {
+      get_packet(index).header.opcode = opcode;
+      get_packet(index).header.mask = lane_mask;
+    }
+    gpu::sync_lane(lane_mask);
+    return Port(*this, lane_mask, index, out);
   }
-  gpu::sync_lane(lane_mask);
-  return Port(*this, lane_mask, index, out);
+  return cpp::nullopt;
 }
 
 LIBC_INLINE Client::Port Client::open(uint16_t opcode) {
@@ -436,33 +453,36 @@ LIBC_INLINE Client::Port Client::open(uint16_t opcode) {
 /// port if it has a pending receive operation
 [[clang::convergent]] LIBC_INLINE cpp::optional<Server::Port>
 Server::try_open() {
-  constexpr uint64_t index = 0;
-  const uint64_t lane_mask = gpu::get_lane_mask();
-
-  uint32_t in = load_inbox(index);
-  uint32_t out = outbox[index].load(cpp::MemoryOrder::RELAXED);
-
-  // The server is passive, if there is no work pending don't bother
-  // opening a port.
-  if (buffer_unavailable(in, out))
-    return cpp::nullopt;
-
-  // Attempt to acquire the lock on this index.
-  if (!try_lock(lane_mask, index))
-    return cpp::nullopt;
-
-  // The mailbox state must be read with the lock held.
-  atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
-
-  in = load_inbox(index);
-  out = outbox[index].load(cpp::MemoryOrder::RELAXED);
+  // Perform a naive linear scan for a port that has a pending request.
+  for (uint64_t index = 0; index < port_count; ++index) {
+    uint32_t in = load_inbox(index);
+    uint32_t out = outbox[index].load(cpp::MemoryOrder::RELAXED);
+
+    // The server is passive, if there is no work pending don't bother
+    // opening a port.
+    if (buffer_unavailable(in, out))
+      continue;
+
+    // Attempt to acquire the lock on this index.
+    uint64_t lane_mask = gpu::get_lane_mask();
+    // Attempt to acquire the lock on this index.
+    if (!try_lock(lane_mask, index))
+      continue;
+
+    // The mailbox state must be read with the lock held.
+    atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
+
+    in = load_inbox(index);
+    out = outbox[index].load(cpp::MemoryOrder::RELAXED);
+
+    if (buffer_unavailable(in, out)) {
+      unlock(lane_mask, index);
+      continue;
+    }
 
-  if (buffer_unavailable(in, out)) {
-    unlock(lane_mask, index);
-    return cpp::nullopt;
+    return Port(*this, lane_mask, index, out);
   }
-
-  return Port(*this, lane_mask, index, out);
+  return cpp::nullopt;
 }
 
 LIBC_INLINE Server::Port Server::open() {

diff  --git a/libc/src/__support/RPC/rpc_util.h b/libc/src/__support/RPC/rpc_util.h
index 224723ef20c92..c6282e40c903e 100644
--- a/libc/src/__support/RPC/rpc_util.h
+++ b/libc/src/__support/RPC/rpc_util.h
@@ -49,6 +49,11 @@ LIBC_INLINE constexpr bool is_process_gpu() {
 #endif
 }
 
+/// Return \p val aligned "upwards" according to \p align.
+template <typename V, typename A> LIBC_INLINE V align_up(V val, A align) {
+  return ((val + V(align) - 1) / V(align)) * V(align);
+}
+
 } // namespace rpc
 } // namespace __llvm_libc
 

diff  --git a/libc/startup/gpu/amdgpu/start.cpp b/libc/startup/gpu/amdgpu/start.cpp
index d1dfc7b8c11da..84adb3b97527b 100644
--- a/libc/startup/gpu/amdgpu/start.cpp
+++ b/libc/startup/gpu/amdgpu/start.cpp
@@ -15,7 +15,7 @@ extern "C" int main(int argc, char **argv, char **envp);
 
 namespace __llvm_libc {
 
-static cpp::Atomic<uint32_t> lock = 0;
+static cpp::Atomic<uint32_t> lock[rpc::default_port_count] = {0};
 
 extern "C" uintptr_t __init_array_start[];
 extern "C" uintptr_t __init_array_end[];
@@ -43,7 +43,8 @@ extern "C" [[gnu::visibility("protected"), clang::amdgpu_kernel]] void
 _begin(int argc, char **argv, char **env, void *in, void *out, void *buffer) {
   // We need to set up the RPC client first in case any of the constructors
   // require it.
-  __llvm_libc::rpc::client.reset(__llvm_libc::gpu::get_lane_size(),
+  __llvm_libc::rpc::client.reset(__llvm_libc::rpc::default_port_count,
+                                 __llvm_libc::gpu::get_lane_size(),
                                  &__llvm_libc::lock, in, out, buffer);
 
   // We want the fini array callbacks to be run after other atexit

diff  --git a/libc/startup/gpu/nvptx/start.cpp b/libc/startup/gpu/nvptx/start.cpp
index 83453ae1e47a6..1d366dc829dfb 100644
--- a/libc/startup/gpu/nvptx/start.cpp
+++ b/libc/startup/gpu/nvptx/start.cpp
@@ -15,7 +15,7 @@ extern "C" int main(int argc, char **argv, char **envp);
 
 namespace __llvm_libc {
 
-static cpp::Atomic<uint32_t> lock = 0;
+static cpp::Atomic<uint32_t> lock[rpc::default_port_count] = {0};
 
 extern "C" {
 // Nvidia's 'nvlink' linker does not provide these symbols. We instead need
@@ -47,7 +47,8 @@ extern "C" [[gnu::visibility("protected"), clang::nvptx_kernel]] void
 _begin(int argc, char **argv, char **env, void *in, void *out, void *buffer) {
   // We need to set up the RPC client first in case any of the constructors
   // require it.
-  __llvm_libc::rpc::client.reset(__llvm_libc::gpu::get_lane_size(),
+  __llvm_libc::rpc::client.reset(__llvm_libc::rpc::default_port_count,
+                                 __llvm_libc::gpu::get_lane_size(),
                                  &__llvm_libc::lock, in, out, buffer);
 
   // We want the fini array callbacks to be run after other atexit

diff  --git a/libc/utils/gpu/loader/Server.h b/libc/utils/gpu/loader/Server.h
index 6ffb32955c895..f77bf256618a4 100644
--- a/libc/utils/gpu/loader/Server.h
+++ b/libc/utils/gpu/loader/Server.h
@@ -19,47 +19,51 @@
 
 static __llvm_libc::rpc::Server server;
 
-static __llvm_libc::cpp::Atomic<uint32_t> lock;
+static __llvm_libc::cpp::Atomic<uint32_t>
+    lock[__llvm_libc::rpc::default_port_count] = {0};
 
 /// Queries the RPC client at least once and performs server-side work if there
 /// are any active requests.
 void handle_server() {
-  auto port = server.try_open();
-  if (!port)
-    return;
+  // Continue servicing the client until there is no work left and we return.
+  for (;;) {
+    auto port = server.try_open();
+    if (!port)
+      return;
 
-  switch (port->get_opcode()) {
-  case __llvm_libc::rpc::Opcode::PRINT_TO_STDERR: {
-    uint64_t str_size[__llvm_libc::rpc::MAX_LANE_SIZE] = {0};
-    char *strs[__llvm_libc::rpc::MAX_LANE_SIZE] = {nullptr};
-    port->recv_n([&](uint64_t size, uint32_t id) {
-      str_size[id] = size;
-      strs[id] = new char[size];
-      return strs[id];
-    });
-    for (uint64_t i = 0; i < __llvm_libc::rpc::MAX_LANE_SIZE; ++i) {
-      if (strs[i]) {
-        fwrite(strs[i], str_size[i], 1, stderr);
-        delete[] strs[i];
+    switch (port->get_opcode()) {
+    case __llvm_libc::rpc::Opcode::PRINT_TO_STDERR: {
+      uint64_t str_size[__llvm_libc::rpc::MAX_LANE_SIZE] = {0};
+      char *strs[__llvm_libc::rpc::MAX_LANE_SIZE] = {nullptr};
+      port->recv_n([&](uint64_t size, uint32_t id) {
+        str_size[id] = size;
+        strs[id] = new char[size];
+        return strs[id];
+      });
+      for (uint64_t i = 0; i < __llvm_libc::rpc::MAX_LANE_SIZE; ++i) {
+        if (strs[i]) {
+          fwrite(strs[i], str_size[i], 1, stderr);
+          delete[] strs[i];
+        }
       }
+      break;
     }
-    break;
-  }
-  case __llvm_libc::rpc::Opcode::EXIT: {
-    port->recv([](__llvm_libc::rpc::Buffer *buffer) {
-      exit(reinterpret_cast<uint32_t *>(buffer->data)[0]);
-    });
-    break;
-  }
-  case __llvm_libc::rpc::Opcode::TEST_INCREMENT: {
-    port->recv_and_send([](__llvm_libc::rpc::Buffer *buffer) {
-      reinterpret_cast<uint64_t *>(buffer->data)[0] += 1;
-    });
-    break;
-  }
-  default:
-    port->recv([](__llvm_libc::rpc::Buffer *buffer) {});
+    case __llvm_libc::rpc::Opcode::EXIT: {
+      port->recv([](__llvm_libc::rpc::Buffer *buffer) {
+        exit(reinterpret_cast<uint32_t *>(buffer->data)[0]);
+      });
+      break;
+    }
+    case __llvm_libc::rpc::Opcode::TEST_INCREMENT: {
+      port->recv_and_send([](__llvm_libc::rpc::Buffer *buffer) {
+        reinterpret_cast<uint64_t *>(buffer->data)[0] += 1;
+      });
+      break;
+    }
+    default:
+      port->recv([](__llvm_libc::rpc::Buffer *buffer) {});
+    }
+    port->close();
   }
-  port->close();
 }
 #endif

diff  --git a/libc/utils/gpu/loader/amdgpu/Loader.cpp b/libc/utils/gpu/loader/amdgpu/Loader.cpp
index ee12d6d63ffb8..07fa1ae7fe161 100644
--- a/libc/utils/gpu/loader/amdgpu/Loader.cpp
+++ b/libc/utils/gpu/loader/amdgpu/Loader.cpp
@@ -330,6 +330,7 @@ int load(int argc, char **argv, char **envp, void *image, size_t size,
   hsa_amd_memory_fill(dev_ret, 0, sizeof(int));
 
   // Allocate finegrained memory for the RPC server and client to share.
+  uint64_t port_size = __llvm_libc::rpc::default_port_count;
   uint32_t wavefront_size = 0;
   if (hsa_status_t err = hsa_agent_get_info(
           dev_agent, HSA_AGENT_INFO_WAVEFRONT_SIZE, &wavefront_size))
@@ -338,18 +339,19 @@ int load(int argc, char **argv, char **envp, void *image, size_t size,
   void *server_outbox;
   void *buffer;
   if (hsa_status_t err = hsa_amd_memory_pool_allocate(
-          finegrained_pool, sizeof(__llvm_libc::cpp::Atomic<int>),
+          finegrained_pool, port_size * sizeof(__llvm_libc::cpp::Atomic<int>),
           /*flags=*/0, &server_inbox))
     handle_error(err);
   if (hsa_status_t err = hsa_amd_memory_pool_allocate(
-          finegrained_pool, sizeof(__llvm_libc::cpp::Atomic<int>),
+          finegrained_pool, port_size * sizeof(__llvm_libc::cpp::Atomic<int>),
           /*flags=*/0, &server_outbox))
     handle_error(err);
   if (hsa_status_t err = hsa_amd_memory_pool_allocate(
           finegrained_pool,
-          align_up(sizeof(__llvm_libc::rpc::Header) +
-                       (wavefront_size * sizeof(__llvm_libc::rpc::Buffer)),
-                   alignof(__llvm_libc::rpc::Packet)),
+          port_size *
+              align_up(sizeof(__llvm_libc::rpc::Header) +
+                           (wavefront_size * sizeof(__llvm_libc::rpc::Buffer)),
+                       alignof(__llvm_libc::rpc::Packet)),
           /*flags=*/0, &buffer))
     handle_error(err);
   hsa_amd_agents_allow_access(1, &dev_agent, nullptr, server_inbox);
@@ -357,7 +359,8 @@ int load(int argc, char **argv, char **envp, void *image, size_t size,
   hsa_amd_agents_allow_access(1, &dev_agent, nullptr, buffer);
 
   // Initialize the RPC server's buffer for host-device communication.
-  server.reset(wavefront_size, &lock, server_inbox, server_outbox, buffer);
+  server.reset(port_size, wavefront_size, &lock, server_inbox, server_outbox,
+               buffer);
 
   // Obtain a queue with the minimum (power of two) size, used to send commands
   // to the HSA runtime and launch execution on the device.

diff  --git a/libc/utils/gpu/loader/nvptx/Loader.cpp b/libc/utils/gpu/loader/nvptx/Loader.cpp
index ca18da939f4cb..314f5a8055fb9 100644
--- a/libc/utils/gpu/loader/nvptx/Loader.cpp
+++ b/libc/utils/gpu/loader/nvptx/Loader.cpp
@@ -246,18 +246,22 @@ int load(int argc, char **argv, char **envp, void *image, size_t size,
   if (CUresult err = cuMemsetD32(dev_ret, 0, 1))
     handle_error(err);
 
+  uint64_t port_size = __llvm_libc::rpc::default_port_count;
   uint32_t warp_size = 32;
-  void *server_inbox = allocator(sizeof(__llvm_libc::cpp::Atomic<int>));
-  void *server_outbox = allocator(sizeof(__llvm_libc::cpp::Atomic<int>));
-  void *buffer =
-      allocator(align_up(sizeof(__llvm_libc::rpc::Header) +
-                             (warp_size * sizeof(__llvm_libc::rpc::Buffer)),
-                         alignof(__llvm_libc::rpc::Packet)));
+  void *server_inbox =
+      allocator(port_size * sizeof(__llvm_libc::cpp::Atomic<int>));
+  void *server_outbox =
+      allocator(port_size * sizeof(__llvm_libc::cpp::Atomic<int>));
+  void *buffer = allocator(
+      port_size * align_up(sizeof(__llvm_libc::rpc::Header) +
+                               (warp_size * sizeof(__llvm_libc::rpc::Buffer)),
+                           alignof(__llvm_libc::rpc::Packet)));
   if (!server_inbox || !server_outbox || !buffer)
     handle_error("Failed to allocate memory the RPC client / server.");
 
   // Initialize the RPC server's buffer for host-device communication.
-  server.reset(warp_size, &lock, server_inbox, server_outbox, buffer);
+  server.reset(port_size, warp_size, &lock, server_inbox, server_outbox,
+               buffer);
 
   LaunchParameters single_threaded_params = {1, 1, 1, 1, 1, 1};
   // Call the kernel to


        


More information about the libc-commits mailing list