[libc-commits] [libc] d0ff5e4 - [libc] Update RPC interface for system utilities on the GPU

Joseph Huber via libc-commits libc-commits at lists.llvm.org
Wed Apr 19 18:02:41 PDT 2023


Author: Joseph Huber
Date: 2023-04-19T20:02:31-05:00
New Revision: d0ff5e40308ee33936bfcc131f33adb4066b946f

URL: https://github.com/llvm/llvm-project/commit/d0ff5e40308ee33936bfcc131f33adb4066b946f
DIFF: https://github.com/llvm/llvm-project/commit/d0ff5e40308ee33936bfcc131f33adb4066b946f.diff

LOG: [libc] Update RPC interface for system utilities on the GPU

This patch reworks the RPC interface to allow more generic memory
operations using the shared better. This patch decomposes the entire RPC
interface into opening a port and calling `send` or `recv` on it.

The `send` function sends a single packet of the length of the buffer.
The `recv` function is paired with the `send` call to then use the data.
So, any aribtrary combination of sending packets is possible. The only
restriction is that the client initiates the exchange with a `send`
while the server consumes it with a `recv`.

The operation of this is driven by two independent state machines that
tracks the buffer ownership during loads / stores. We keep track of two
so that we can transition between a send state and a recv state without
an extra wait. State transitions are observed via bit toggling, e.g.

This interface supports an efficient `send -> ack -> send -> ack -> send`
interface and allows for the last send to be ignored without checking
the ack.

A following patch will add some more comprehensive testing to this interface. I
I informally made an RPC call that simply incremented an integer and it took
roughly 10 microsends to complete an RPC call.

Reviewed By: jdoerfert

Differential Revision: https://reviews.llvm.org/D148288

Added: 
    libc/utils/gpu/loader/Server.h

Modified: 
    libc/src/__support/OSUtil/gpu/io.cpp
    libc/src/__support/OSUtil/gpu/io.h
    libc/src/__support/OSUtil/gpu/quick_exit.cpp
    libc/src/__support/RPC/rpc.h
    libc/startup/gpu/amdgpu/start.cpp
    libc/startup/gpu/nvptx/start.cpp
    libc/utils/gpu/loader/CMakeLists.txt
    libc/utils/gpu/loader/amdgpu/CMakeLists.txt
    libc/utils/gpu/loader/amdgpu/Loader.cpp
    libc/utils/gpu/loader/nvptx/CMakeLists.txt
    libc/utils/gpu/loader/nvptx/Loader.cpp

Removed: 
    


################################################################################
diff  --git a/libc/src/__support/OSUtil/gpu/io.cpp b/libc/src/__support/OSUtil/gpu/io.cpp
index 2570a1bfc25b4..13ab96dacc284 100644
--- a/libc/src/__support/OSUtil/gpu/io.cpp
+++ b/libc/src/__support/OSUtil/gpu/io.cpp
@@ -14,34 +14,10 @@
 
 namespace __llvm_libc {
 
-namespace internal {
-
-static constexpr size_t BUFFER_SIZE = sizeof(rpc::Buffer) - sizeof(uint64_t);
-static constexpr size_t MAX_STRING_SIZE = BUFFER_SIZE;
-
-LIBC_INLINE void send_null_terminated(cpp::string_view src) {
-  rpc::client.run(
-      [&](rpc::Buffer *buffer) {
-        buffer->data[0] = rpc::Opcode::PRINT_TO_STDERR;
-        char *data = reinterpret_cast<char *>(&buffer->data[1]);
-        inline_memcpy(data, src.data(), src.size());
-        data[src.size()] = '\0';
-      },
-      [](rpc::Buffer *) { /* void */ });
-}
-
-} // namespace internal
-
 void write_to_stderr(cpp::string_view msg) {
-  bool send_empty_string = true;
-  for (; !msg.empty();) {
-    const auto chunk = msg.substr(0, internal::MAX_STRING_SIZE);
-    internal::send_null_terminated(chunk);
-    msg.remove_prefix(chunk.size());
-    send_empty_string = false;
-  }
-  if (send_empty_string)
-    internal::send_null_terminated("");
+  rpc::Port port = rpc::client.open(rpc::PRINT_TO_STDERR);
+  port.send_n(msg.data(), msg.size() + 1);
+  port.close();
 }
 
 } // namespace __llvm_libc

diff  --git a/libc/src/__support/OSUtil/gpu/io.h b/libc/src/__support/OSUtil/gpu/io.h
index a27d1a114506f..a7d15464ccedc 100644
--- a/libc/src/__support/OSUtil/gpu/io.h
+++ b/libc/src/__support/OSUtil/gpu/io.h
@@ -16,10 +16,6 @@ namespace __llvm_libc {
 
 void write_to_stderr(cpp::string_view msg);
 
-LIBC_INLINE void write_to_stderr(const char *msg) {
-  write_to_stderr(cpp::string_view(msg));
-}
-
 } // namespace __llvm_libc
 
 #endif // LLVM_LIBC_SRC_SUPPORT_OSUTIL_LINUX_IO_H

diff  --git a/libc/src/__support/OSUtil/gpu/quick_exit.cpp b/libc/src/__support/OSUtil/gpu/quick_exit.cpp
index 3fab438a357a5..b56af4a30d9d4 100644
--- a/libc/src/__support/OSUtil/gpu/quick_exit.cpp
+++ b/libc/src/__support/OSUtil/gpu/quick_exit.cpp
@@ -17,14 +17,11 @@
 namespace __llvm_libc {
 
 void quick_exit(int status) {
-  // TODO: Support asynchronous calls so we don't wait and exit from the GPU
-  // immediately.
-  rpc::client.run(
-      [&](rpc::Buffer *buffer) {
-        buffer->data[0] = rpc::Opcode::EXIT;
-        buffer->data[1] = status;
-      },
-      [](rpc::Buffer *) { /* void */ });
+  rpc::Port port = rpc::client.open(rpc::EXIT);
+  port.send([&](rpc::Buffer *buffer) {
+    reinterpret_cast<uint32_t *>(buffer->data)[0] = status;
+  });
+  port.close();
 
 #if defined(LIBC_TARGET_ARCH_IS_NVPTX)
   asm("exit;" ::: "memory");

diff  --git a/libc/src/__support/RPC/rpc.h b/libc/src/__support/RPC/rpc.h
index 196a62daa970c..e73dbaaf5a21d 100644
--- a/libc/src/__support/RPC/rpc.h
+++ b/libc/src/__support/RPC/rpc.h
@@ -20,48 +20,119 @@
 
 #include "rpc_util.h"
 #include "src/__support/CPP/atomic.h"
+#include "src/__support/CPP/optional.h"
 #include "src/__support/GPU/utils.h"
+#include "src/string/memory_utils/memcpy_implementations.h"
 
 #include <stdint.h>
 
 namespace __llvm_libc {
 namespace rpc {
 
-/// A list of opcodes that we use to invoke certain actions on the server. We
-/// reserve the first 255 values for internal libc usage.
-enum Opcode : uint64_t {
+/// A list of opcodes that we use to invoke certain actions on the server.
+enum Opcode : uint16_t {
   NOOP = 0,
   PRINT_TO_STDERR = 1,
   EXIT = 2,
-  LIBC_LAST = (1UL << 8) - 1,
 };
 
 /// A fixed size channel used to communicate between the RPC client and server.
-struct Buffer {
-  uint64_t data[8];
+struct alignas(64) Buffer {
+  uint8_t data[62];
+  uint16_t opcode;
 };
+static_assert(sizeof(Buffer) == 64, "Buffer size mismatch");
 
 /// A common process used to synchronize communication between a client and a
 /// server. The process contains an inbox and an outbox used for signaling
-/// ownership of the shared buffer.
+/// ownership of the shared buffer between both sides.
+///
+/// This process is designed to support mostly arbitrary combinations of 'send'
+/// and 'recv' operations on the shared buffer as long as these operations are
+/// mirrored by the other process. These operations exchange ownership of the
+/// fixed-size buffer between the users of the protocol. The assumptions when
+/// using this process are as follows:
+///   - The client will always start with a 'send' operation
+///   - The server will always start with a 'recv' operation
+///   - For every 'send' / 'recv' call on one side of the process there is a
+///     mirrored 'recv' / 'send' call.
+///
+/// The communication protocol is organized as a pair of two-state state
+/// machines. One state machine tracks outgoing sends and the other tracks
+/// incoming receives. For example, a 'send' operation uses its input 'Ack' bit
+/// and its output 'Data' bit. If these bits are equal the sender owns the
+/// buffer, otherwise the receiver owns the buffer and we wait. Similarly, a
+/// 'recv' operation uses its output 'Ack' bit and input 'Data' bit. If these
+/// bits are not equal the receiver owns the buffer, otherwise the sender owns
+/// the buffer.
 struct Process {
   LIBC_INLINE Process() = default;
   LIBC_INLINE Process(const Process &) = default;
   LIBC_INLINE Process &operator=(const Process &) = default;
   LIBC_INLINE ~Process() = default;
 
+  static constexpr uint32_t Data = 0b01;
+  static constexpr uint32_t Ack = 0b10;
+
+  cpp::Atomic<uint32_t> *lock;
   cpp::Atomic<uint32_t> *inbox;
   cpp::Atomic<uint32_t> *outbox;
   Buffer *buffer;
 
   /// Initialize the communication channels.
-  LIBC_INLINE void reset(void *inbox, void *outbox, void *buffer) {
+  LIBC_INLINE void reset(void *lock, void *inbox, void *outbox, void *buffer) {
     *this = {
+        reinterpret_cast<cpp::Atomic<uint32_t> *>(lock),
         reinterpret_cast<cpp::Atomic<uint32_t> *>(inbox),
         reinterpret_cast<cpp::Atomic<uint32_t> *>(outbox),
         reinterpret_cast<Buffer *>(buffer),
     };
   }
+
+  /// Determines if this process owns the buffer for a send. We can send data if
+  /// the output data bit matches the input acknowledge bit.
+  LIBC_INLINE static bool can_send_data(uint32_t in, uint32_t out) {
+    return bool(in & Process::Ack) == bool(out & Process::Data);
+  }
+
+  /// Determines if this process owns the buffer for a receive. We can send data
+  /// if the output acknowledge bit does not match the input data bit.
+  LIBC_INLINE static bool can_recv_data(uint32_t in, uint32_t out) {
+    return bool(in & Process::Data) != bool(out & Process::Ack);
+  }
+};
+
+/// The port provides the interface to communicate between the multiple
+/// processes. A port is conceptually an index into the memory provided by the
+/// underlying process that is guarded by a lock bit.
+struct Port {
+  // TODO: This should be move-only.
+  LIBC_INLINE Port(Process &process, uint64_t index, uint32_t out)
+      : process(process), index(index), out(out) {}
+  LIBC_INLINE Port(const Port &) = default;
+  LIBC_INLINE Port &operator=(const Port &) = delete;
+  LIBC_INLINE ~Port() = default;
+
+  template <typename U> LIBC_INLINE void recv(U use);
+  template <typename F> LIBC_INLINE void send(F fill);
+  template <typename F, typename U>
+  LIBC_INLINE void send_and_recv(F fill, U use);
+  template <typename W> LIBC_INLINE void recv_and_send(W work);
+  LIBC_INLINE void send_n(const void *src, uint64_t size);
+  template <typename A> LIBC_INLINE void recv_n(A alloc);
+
+  LIBC_INLINE uint16_t get_opcode() const {
+    return process.buffer[index].opcode;
+  }
+
+  LIBC_INLINE void close() {
+    process.lock[index].store(0, cpp::MemoryOrder::RELAXED);
+  }
+
+private:
+  Process &process;
+  uint64_t index;
+  uint32_t out;
 };
 
 /// The RPC client used to make requests to the server.
@@ -71,7 +142,8 @@ struct Client : public Process {
   LIBC_INLINE Client &operator=(const Client &) = default;
   LIBC_INLINE ~Client() = default;
 
-  template <typename F, typename U> LIBC_INLINE void run(F fill, U use);
+  LIBC_INLINE cpp::optional<Port> try_open(uint16_t opcode);
+  LIBC_INLINE Port open(uint16_t opcode);
 };
 
 /// The RPC server used to respond to the client.
@@ -81,88 +153,154 @@ struct Server : public Process {
   LIBC_INLINE Server &operator=(const Server &) = default;
   LIBC_INLINE ~Server() = default;
 
-  template <typename W, typename C> LIBC_INLINE bool handle(W work, C clean);
+  LIBC_INLINE cpp::optional<Port> try_open();
+  LIBC_INLINE Port open();
 };
 
-/// Run the RPC client protocol to communicate with the server. We perform the
-/// following high level actions to complete a communication:
-///   - Apply \p fill to the shared buffer and write 1 to the outbox.
-///   - Wait until the inbox is 1.
-///   - Apply \p use to the shared buffer and write 0 to the outbox.
-///   - Wait until the inbox is 0.
-template <typename F, typename U> LIBC_INLINE void Client::run(F fill, U use) {
-  bool in = inbox->load(cpp::MemoryOrder::RELAXED);
-  bool out = outbox->load(cpp::MemoryOrder::RELAXED);
-  atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
-  // Apply the \p fill to the buffer and signal the server.
-  if (!in & !out) {
-    fill(buffer);
-    atomic_thread_fence(cpp::MemoryOrder::RELEASE);
-    outbox->store(1, cpp::MemoryOrder::RELAXED);
-    out = 1;
+/// Applies \p fill to the shared buffer and initiates a send operation.
+template <typename F> LIBC_INLINE void Port::send(F fill) {
+  uint32_t in = process.inbox[index].load(cpp::MemoryOrder::RELAXED);
+
+  // We need to wait until we own the buffer before sending.
+  while (!Process::can_send_data(in, out)) {
+    sleep_briefly();
+    in = process.inbox[index].load(cpp::MemoryOrder::RELAXED);
   }
-  // Wait for the server to work on the buffer and respond.
-  if (!in & out) {
-    while (!in) {
-      sleep_briefly();
-      in = inbox->load(cpp::MemoryOrder::RELAXED);
-    }
-    atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
+
+  // Apply the \p fill function to initialize the buffer and release the memory.
+  fill(&process.buffer[index]);
+  out = out ^ Process::Data;
+  atomic_thread_fence(cpp::MemoryOrder::RELEASE);
+  process.outbox[index].store(out, cpp::MemoryOrder::RELAXED);
+}
+
+/// Applies \p use to the shared buffer and acknowledges the send.
+template <typename U> LIBC_INLINE void Port::recv(U use) {
+  uint32_t in = process.inbox[index].load(cpp::MemoryOrder::RELAXED);
+
+  // We need to wait until we own the buffer before receiving.
+  while (!Process::can_recv_data(in, out)) {
+    sleep_briefly();
+    in = process.inbox[index].load(cpp::MemoryOrder::RELAXED);
   }
-  // Apply \p use to the buffer and signal the server.
-  if (in & out) {
-    use(buffer);
-    atomic_thread_fence(cpp::MemoryOrder::RELEASE);
-    outbox->store(0, cpp::MemoryOrder::RELAXED);
-    out = 0;
+  atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
+
+  // Apply the \p use function to read the memory out of the buffer.
+  use(&process.buffer[index]);
+  out = out ^ Process::Ack;
+  process.outbox[index].store(out, cpp::MemoryOrder::RELAXED);
+}
+
+/// Combines a send and receive into a single function.
+template <typename F, typename U>
+LIBC_INLINE void Port::send_and_recv(F fill, U use) {
+  send(fill);
+  recv(use);
+}
+
+/// Combines a receive and send operation into a single function. The \p work
+/// function modifies the buffer in-place and the send is only used to initiate
+/// the copy back.
+template <typename W> LIBC_INLINE void Port::recv_and_send(W work) {
+  recv(work);
+  send([](Buffer *) { /* no-op */ });
+}
+
+/// Sends an arbitrarily sized data buffer \p src across the shared channel in
+/// multiples of the packet length.
+LIBC_INLINE void Port::send_n(const void *src, uint64_t size) {
+  // TODO: We could send the first bytes in this call and potentially save an
+  // extra send operation.
+  send([=](Buffer *buffer) { buffer->data[0] = size; });
+  const uint8_t *ptr = reinterpret_cast<const uint8_t *>(src);
+  for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) {
+    send([=](Buffer *buffer) {
+      const uint64_t len =
+          size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data) : size - idx;
+      inline_memcpy(buffer->data, ptr + idx, len);
+    });
   }
-  // Wait for the server to signal the end of the protocol.
-  if (in & !out) {
-    while (in) {
-      sleep_briefly();
-      in = inbox->load(cpp::MemoryOrder::RELAXED);
-    }
-    atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
+}
+
+/// Receives an arbitrarily sized data buffer across the shared channel in
+/// multiples of the packet length. The \p alloc function is called with the
+/// size of the data so that we can initialize the size of the \p dst buffer.
+template <typename A> LIBC_INLINE void Port::recv_n(A alloc) {
+  uint64_t size = 0;
+  recv([&](Buffer *buffer) { size = buffer->data[0]; });
+  uint8_t *dst = reinterpret_cast<uint8_t *>(alloc(size));
+  for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) {
+    recv([=](Buffer *buffer) {
+      uint64_t len =
+          size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data) : size - idx;
+      inline_memcpy(dst + idx, buffer->data, len);
+    });
   }
 }
 
-/// Run the RPC server protocol to communicate with the client. This is
-/// non-blocking and only checks the server a single time. We perform the
-/// following high level actions to complete a communication:
-///   - Query if the inbox is 1 and exit if there is no work to do.
-///   - Apply \p work to the shared buffer and write 1 to the outbox.
-///   - Wait until the inbox is 0.
-///   - Apply \p clean to the shared buffer and write 0 to the outbox.
-template <typename W, typename C>
-LIBC_INLINE bool Server::handle(W work, C clean) {
-  bool in = inbox->load(cpp::MemoryOrder::RELAXED);
-  bool out = outbox->load(cpp::MemoryOrder::RELAXED);
-  atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
-  // There is no work to do, exit early.
-  if (!in & !out)
-    return false;
-  // Apply \p work to the buffer and signal the client.
-  if (in & !out) {
-    work(buffer);
-    atomic_thread_fence(cpp::MemoryOrder::RELEASE);
-    outbox->store(1, cpp::MemoryOrder::RELAXED);
-    out = 1;
+/// Attempts to open a port to use as the client. The client can only open a
+/// port if we find an index that is in a valid sending state. That is, there
+/// are send operations pending that haven't been serviced on this port. Each
+/// port instance uses an associated \p opcode to tell the server what to do.
+LIBC_INLINE cpp::optional<Port> Client::try_open(uint16_t opcode) {
+  // Attempt to acquire the lock on this index.
+  if (lock->fetch_or(1, cpp::MemoryOrder::RELAXED))
+    return cpp::nullopt;
+
+  uint32_t in = inbox->load(cpp::MemoryOrder::RELAXED);
+  uint32_t out = outbox->load(cpp::MemoryOrder::RELAXED);
+
+  // Once we acquire the index we need to check if we are in a valid sending
+  // state.
+  if (!can_send_data(in, out)) {
+    lock->store(0, cpp::MemoryOrder::RELAXED);
+    return cpp::nullopt;
   }
-  // Wait for the client to use the buffer and respond.
-  if (in & out) {
-    while (in)
-      in = inbox->load(cpp::MemoryOrder::RELAXED);
-    atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
+
+  buffer->opcode = opcode;
+  return Port(*this, 0, out);
+}
+
+LIBC_INLINE Port Client::open(uint16_t opcode) {
+  for (;;) {
+    if (cpp::optional<Port> p = try_open(opcode))
+      return p.value();
+    sleep_briefly();
   }
-  // Clean up the buffer and signal the end of the protocol.
-  if (!in & out) {
-    clean(buffer);
-    atomic_thread_fence(cpp::MemoryOrder::RELEASE);
-    outbox->store(0, cpp::MemoryOrder::RELAXED);
-    out = 0;
+}
+
+/// Attempts to open a port to use as the server. The server can only open a
+/// port if it has a pending receive operation
+LIBC_INLINE cpp::optional<Port> Server::try_open() {
+  uint32_t in = inbox->load(cpp::MemoryOrder::RELAXED);
+  uint32_t out = outbox->load(cpp::MemoryOrder::RELAXED);
+
+  // The server is passive, if there is no work pending don't bother
+  // opening a port.
+  if (!can_recv_data(in, out))
+    return cpp::nullopt;
+
+  // Attempt to acquire the lock on this index.
+  if (lock->fetch_or(1, cpp::MemoryOrder::RELAXED))
+    return cpp::nullopt;
+
+  in = inbox->load(cpp::MemoryOrder::RELAXED);
+  out = outbox->load(cpp::MemoryOrder::RELAXED);
+
+  if (!can_recv_data(in, out)) {
+    lock->store(0, cpp::MemoryOrder::RELAXED);
+    return cpp::nullopt;
   }
 
-  return true;
+  return Port(*this, 0, out);
+}
+
+LIBC_INLINE Port Server::open() {
+  for (;;) {
+    if (cpp::optional<Port> p = try_open())
+      return p.value();
+    sleep_briefly();
+  }
 }
 
 } // namespace rpc

diff  --git a/libc/startup/gpu/amdgpu/start.cpp b/libc/startup/gpu/amdgpu/start.cpp
index 9915dff94f6fa..66f06b086a233 100644
--- a/libc/startup/gpu/amdgpu/start.cpp
+++ b/libc/startup/gpu/amdgpu/start.cpp
@@ -8,12 +8,14 @@
 
 #include "src/__support/RPC/rpc_client.h"
 
+static __llvm_libc::cpp::Atomic<uint32_t> lock;
+
 extern "C" int main(int argc, char **argv, char **envp);
 
 extern "C" [[gnu::visibility("protected"), clang::amdgpu_kernel]] void
 _start(int argc, char **argv, char **envp, int *ret, void *in, void *out,
        void *buffer) {
-  __llvm_libc::rpc::client.reset(in, out, buffer);
+  __llvm_libc::rpc::client.reset(&lock, in, out, buffer);
 
   __atomic_fetch_or(ret, main(argc, argv, envp), __ATOMIC_RELAXED);
 }

diff  --git a/libc/startup/gpu/nvptx/start.cpp b/libc/startup/gpu/nvptx/start.cpp
index b09d6f685f212..9939c6e21330b 100644
--- a/libc/startup/gpu/nvptx/start.cpp
+++ b/libc/startup/gpu/nvptx/start.cpp
@@ -8,12 +8,14 @@
 
 #include "src/__support/RPC/rpc_client.h"
 
+static __llvm_libc::cpp::Atomic<uint32_t> lock;
+
 extern "C" int main(int argc, char **argv, char **envp);
 
 extern "C" [[gnu::visibility("protected")]] __attribute__((nvptx_kernel)) void
 _start(int argc, char **argv, char **envp, int *ret, void *in, void *out,
        void *buffer) {
-  __llvm_libc::rpc::client.reset(in, out, buffer);
+  __llvm_libc::rpc::client.reset(&lock, in, out, buffer);
 
   __atomic_fetch_or(ret, main(argc, argv, envp), __ATOMIC_RELAXED);
 }

diff  --git a/libc/utils/gpu/loader/CMakeLists.txt b/libc/utils/gpu/loader/CMakeLists.txt
index f037d3f3e205a..3f63ef0bc90e4 100644
--- a/libc/utils/gpu/loader/CMakeLists.txt
+++ b/libc/utils/gpu/loader/CMakeLists.txt
@@ -1,5 +1,8 @@
 add_library(gpu_loader OBJECT Main.cpp)
-target_include_directories(gpu_loader PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
+target_include_directories(gpu_loader PUBLIC
+  ${CMAKE_CURRENT_SOURCE_DIR}
+  ${LIBC_SOURCE_DIR}
+)
 
 find_package(hsa-runtime64 QUIET 1.2.0 HINTS ${CMAKE_INSTALL_PREFIX} PATHS /opt/rocm)
 if(hsa-runtime64_FOUND)

diff  --git a/libc/utils/gpu/loader/Server.h b/libc/utils/gpu/loader/Server.h
new file mode 100644
index 0000000000000..12565b0489ee2
--- /dev/null
+++ b/libc/utils/gpu/loader/Server.h
@@ -0,0 +1,51 @@
+//===-- Generic RPC server interface --------------------------------------===//
+//
+// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
+// See https://llvm.org/LICENSE.txt for license information.
+// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
+//
+//===----------------------------------------------------------------------===//
+
+#ifndef LLVM_LIBC_UTILS_GPU_LOADER_RPC_H
+#define LLVM_LIBC_UTILS_GPU_LOADER_RPC_H
+
+#include <cstdint>
+#include <cstdio>
+#include <cstdlib>
+#include <cstring>
+#include <stddef.h>
+
+#include "src/__support/RPC/rpc.h"
+
+static __llvm_libc::rpc::Server server;
+
+static __llvm_libc::cpp::Atomic<uint32_t> lock;
+
+/// Queries the RPC client at least once and performs server-side work if there
+/// are any active requests.
+void handle_server() {
+  auto port = server.try_open();
+  if (!port)
+    return;
+
+  switch (port->get_opcode()) {
+  case __llvm_libc::rpc::Opcode::PRINT_TO_STDERR: {
+    void *str = nullptr;
+    port->recv_n([&](uint64_t size) { return str = malloc(size); });
+    fputs(reinterpret_cast<char *>(str), stderr);
+    free(str);
+    break;
+  }
+  case __llvm_libc::rpc::Opcode::EXIT: {
+    port->recv([](__llvm_libc::rpc::Buffer *buffer) {
+      exit(reinterpret_cast<uint32_t *>(buffer->data)[0]);
+    });
+    break;
+  }
+  default:
+    port->recv([](__llvm_libc::rpc::Buffer *) { /* no-op */ });
+    return;
+  }
+  port->close();
+}
+#endif

diff  --git a/libc/utils/gpu/loader/amdgpu/CMakeLists.txt b/libc/utils/gpu/loader/amdgpu/CMakeLists.txt
index bef97af0d195f..83c61002c8ae7 100644
--- a/libc/utils/gpu/loader/amdgpu/CMakeLists.txt
+++ b/libc/utils/gpu/loader/amdgpu/CMakeLists.txt
@@ -1,7 +1,6 @@
 add_executable(amdhsa_loader Loader.cpp)
 add_dependencies(amdhsa_loader libc.src.__support.RPC.rpc)
 
-target_include_directories(amdhsa_loader PRIVATE ${LIBC_SOURCE_DIR})
 target_link_libraries(amdhsa_loader
   PRIVATE
   hsa-runtime64::hsa-runtime64

diff  --git a/libc/utils/gpu/loader/amdgpu/Loader.cpp b/libc/utils/gpu/loader/amdgpu/Loader.cpp
index 54e6caf81e2d0..af5f00878e656 100644
--- a/libc/utils/gpu/loader/amdgpu/Loader.cpp
+++ b/libc/utils/gpu/loader/amdgpu/Loader.cpp
@@ -14,8 +14,7 @@
 //===----------------------------------------------------------------------===//
 
 #include "Loader.h"
-
-#include "src/__support/RPC/rpc.h"
+#include "Server.h"
 
 #include <hsa/hsa.h>
 #include <hsa/hsa_ext_amd.h>
@@ -39,30 +38,6 @@ struct kernel_args_t {
   void *buffer;
 };
 
-static __llvm_libc::rpc::Server server;
-
-/// Queries the RPC client at least once and performs server-side work if there
-/// are any active requests.
-void handle_server() {
-  while (server.handle(
-      [&](__llvm_libc::rpc::Buffer *buffer) {
-        switch (static_cast<__llvm_libc::rpc::Opcode>(buffer->data[0])) {
-        case __llvm_libc::rpc::Opcode::PRINT_TO_STDERR: {
-          fputs(reinterpret_cast<const char *>(&buffer->data[1]), stderr);
-          break;
-        }
-        case __llvm_libc::rpc::Opcode::EXIT: {
-          exit(buffer->data[1]);
-          break;
-        }
-        default:
-          return;
-        };
-      },
-      [](__llvm_libc::rpc::Buffer *buffer) {}))
-    ;
-}
-
 /// Print the error code and exit if \p code indicates an error.
 static void handle_error(hsa_status_t code) {
   if (code == HSA_STATUS_SUCCESS || code == HSA_STATUS_INFO_BREAK)
@@ -376,7 +351,7 @@ int load(int argc, char **argv, char **envp, void *image, size_t size,
     handle_error(err);
 
   // Initialize the RPC server's buffer for host-device communication.
-  server.reset(server_inbox, server_outbox, buffer);
+  server.reset(&lock, server_inbox, server_outbox, buffer);
 
   // Initialize the packet header and set the doorbell signal to begin execution
   // by the HSA runtime.

diff  --git a/libc/utils/gpu/loader/nvptx/CMakeLists.txt b/libc/utils/gpu/loader/nvptx/CMakeLists.txt
index 2cad2774ce3d4..f88914383a98e 100644
--- a/libc/utils/gpu/loader/nvptx/CMakeLists.txt
+++ b/libc/utils/gpu/loader/nvptx/CMakeLists.txt
@@ -1,7 +1,6 @@
 add_executable(nvptx_loader Loader.cpp)
 add_dependencies(nvptx_loader libc.src.__support.RPC.rpc)
 
-target_include_directories(nvptx_loader PRIVATE ${LIBC_SOURCE_DIR})
 target_link_libraries(nvptx_loader
   PRIVATE
   gpu_loader

diff  --git a/libc/utils/gpu/loader/nvptx/Loader.cpp b/libc/utils/gpu/loader/nvptx/Loader.cpp
index 15ff11a3bd80e..eb8db7f48572e 100644
--- a/libc/utils/gpu/loader/nvptx/Loader.cpp
+++ b/libc/utils/gpu/loader/nvptx/Loader.cpp
@@ -14,8 +14,7 @@
 //===----------------------------------------------------------------------===//
 
 #include "Loader.h"
-
-#include "src/__support/RPC/rpc.h"
+#include "Server.h"
 
 #include "cuda.h"
 #include <cstddef>
@@ -34,30 +33,6 @@ struct kernel_args_t {
   void *buffer;
 };
 
-static __llvm_libc::rpc::Server server;
-
-/// Queries the RPC client at least once and performs server-side work if there
-/// are any active requests.
-void handle_server() {
-  while (server.handle(
-      [&](__llvm_libc::rpc::Buffer *buffer) {
-        switch (static_cast<__llvm_libc::rpc::Opcode>(buffer->data[0])) {
-        case __llvm_libc::rpc::Opcode::PRINT_TO_STDERR: {
-          fputs(reinterpret_cast<const char *>(&buffer->data[1]), stderr);
-          break;
-        }
-        case __llvm_libc::rpc::Opcode::EXIT: {
-          exit(buffer->data[1]);
-          break;
-        }
-        default:
-          return;
-        };
-      },
-      [](__llvm_libc::rpc::Buffer *buffer) {}))
-    ;
-}
-
 static void handle_error(CUresult err) {
   if (err == CUDA_SUCCESS)
     return;
@@ -155,7 +130,7 @@ int load(int argc, char **argv, char **envp, void *image, size_t size,
                          CU_LAUNCH_PARAM_END};
 
   // Initialize the RPC server's buffer for host-device communication.
-  server.reset(server_inbox, server_outbox, buffer);
+  server.reset(&lock, server_inbox, server_outbox, buffer);
 
   // Call the kernel with the given arguments.
   if (CUresult err = cuLaunchKernel(


        


More information about the libc-commits mailing list