[libc-commits] [libc] d21e507 - [libc] Implement a generic streaming interface in the RPC

Joseph Huber via libc-commits libc-commits at lists.llvm.org
Thu May 11 09:55:50 PDT 2023


Author: Joseph Huber
Date: 2023-05-11T11:55:41-05:00
New Revision: d21e507cfc9f7f5b2881e928d656fddbc9d31147

URL: https://github.com/llvm/llvm-project/commit/d21e507cfc9f7f5b2881e928d656fddbc9d31147
DIFF: https://github.com/llvm/llvm-project/commit/d21e507cfc9f7f5b2881e928d656fddbc9d31147.diff

LOG: [libc] Implement a generic streaming interface in the RPC

Currently we provide the `send_n` and `recv_n` functions. These were
somewhat divergent and not tested on the GPU. This patch changes the
support to be more common. We do this my making the CPU provide an array
equal the to at least the lane size while the GPU can rely on the
private memory address of its stack variables. This allows us to send
data back and forth generically.

Reviewed By: JonChesterfield

Differential Revision: https://reviews.llvm.org/D150379

Added: 
    libc/test/integration/startup/gpu/rpc_stream_test.cpp

Modified: 
    libc/src/__support/RPC/rpc.h
    libc/src/__support/RPC/rpc_util.h
    libc/test/integration/startup/gpu/CMakeLists.txt
    libc/utils/gpu/loader/Server.h

Removed: 
    


################################################################################
diff  --git a/libc/src/__support/RPC/rpc.h b/libc/src/__support/RPC/rpc.h
index 84177abaad249..2f53211a5ad4e 100644
--- a/libc/src/__support/RPC/rpc.h
+++ b/libc/src/__support/RPC/rpc.h
@@ -37,6 +37,7 @@ enum Opcode : uint16_t {
   EXIT = 2,
   TEST_INCREMENT = 3,
   TEST_INTERFACE = 4,
+  TEST_STREAM = 5,
 };
 
 /// A fixed size channel used to communicate between the RPC client and server.
@@ -318,8 +319,10 @@ template <bool T> struct Port {
   template <typename F, typename U>
   LIBC_INLINE void send_and_recv(F fill, U use);
   template <typename W> LIBC_INLINE void recv_and_send(W work);
+  LIBC_INLINE void send_n(const void *const *src, uint64_t *size);
   LIBC_INLINE void send_n(const void *src, uint64_t size);
-  template <typename A> LIBC_INLINE void recv_n(A alloc);
+  template <typename A>
+  LIBC_INLINE void recv_n(void **dst, uint64_t *size, A &&alloc);
 
   LIBC_INLINE uint16_t get_opcode() const {
     return process.get_packet(index).header.opcode;
@@ -424,67 +427,65 @@ LIBC_INLINE void Port<T>::recv_and_send(W work) {
 /// Sends an arbitrarily sized data buffer \p src across the shared channel in
 /// multiples of the packet length.
 template <bool T>
-LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) {
+LIBC_INLINE void Port<T>::send_n(const void *const *src, uint64_t *size) {
   // TODO: We could send the first bytes in this call and potentially save an
   // extra send operation.
   // TODO: We may need a way for the CPU to send 
diff erent strings per thread.
-  send([=](Buffer *buffer) {
-    reinterpret_cast<uint64_t *>(buffer->data)[0] = size;
+  uint64_t num_sends = 0;
+  send([&](Buffer *buffer, uint32_t id) {
+    reinterpret_cast<uint64_t *>(buffer->data)[0] = lane_value(size, id);
+    num_sends = is_process_gpu() ? lane_value(size, id)
+                                 : max(lane_value(size, id), num_sends);
   });
-  const uint8_t *ptr = reinterpret_cast<const uint8_t *>(src);
-  for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) {
-    send([=](Buffer *buffer) {
-      const uint64_t len =
-          size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data) : size - idx;
-      inline_memcpy(buffer->data, ptr + idx, len);
+  for (uint64_t idx = 0; idx < num_sends; idx += sizeof(Buffer::data)) {
+    send([=](Buffer *buffer, uint32_t id) {
+      const uint64_t len = lane_value(size, id) - idx > sizeof(Buffer::data)
+                               ? sizeof(Buffer::data)
+                               : lane_value(size, id) - idx;
+      if (idx < lane_value(size, id))
+        inline_memcpy(
+            buffer->data,
+            reinterpret_cast<const uint8_t *>(lane_value(src, id)) + idx, len);
     });
   }
   gpu::sync_lane(process.get_packet(index).header.mask);
 }
 
+/// Helper routine to simplify the interface when sending from the GPU using
+/// thread private pointers to the underlying value.
+template <bool T>
+LIBC_INLINE void Port<T>::send_n(const void *src, uint64_t size) {
+  static_assert(is_process_gpu(), "Only valid when running on the GPU");
+  const void **src_ptr = &src;
+  uint64_t *size_ptr = &size;
+  send_n(src_ptr, size_ptr);
+}
+
 /// Receives an arbitrarily sized data buffer across the shared channel in
 /// multiples of the packet length. The \p alloc function is called with the
 /// size of the data so that we can initialize the size of the \p dst buffer.
 template <bool T>
 template <typename A>
-LIBC_INLINE void Port<T>::recv_n(A alloc) {
-  // The GPU handles thread private variables and masking implicitly through its
-  // execution model. If this is the CPU we need to manually handle the
-  // possibility that the sent data is of 
diff erent length.
-  if constexpr (is_process_gpu()) {
-    uint64_t size = 0;
-    recv([&](Buffer *buffer) {
-      size = reinterpret_cast<uint64_t *>(buffer->data)[0];
-    });
-    uint8_t *dst = reinterpret_cast<uint8_t *>(alloc(size), gpu::get_lane_id());
-    for (uint64_t idx = 0; idx < size; idx += sizeof(Buffer::data)) {
-      recv([=](Buffer *buffer) {
-        uint64_t len = size - idx > sizeof(Buffer::data) ? sizeof(Buffer::data)
-                                                         : size - idx;
-        inline_memcpy(dst + idx, buffer->data, len);
-      });
-    }
-    return;
-  } else {
-    uint64_t size[MAX_LANE_SIZE];
-    uint8_t *dst[MAX_LANE_SIZE];
-    uint64_t max = 0;
-    recv([&](Buffer *buffer, uint32_t id) {
-      size[id] = reinterpret_cast<uint64_t *>(buffer->data)[0];
-      dst[id] = reinterpret_cast<uint8_t *>(alloc(size[id], id));
-      max = size[id] > max ? size[id] : max;
+LIBC_INLINE void Port<T>::recv_n(void **dst, uint64_t *size, A &&alloc) {
+  uint64_t num_recvs = 0;
+  recv([&](Buffer *buffer, uint32_t id) {
+    lane_value(size, id) = reinterpret_cast<uint64_t *>(buffer->data)[0];
+    lane_value(dst, id) =
+        reinterpret_cast<uint8_t *>(alloc(lane_value(size, id)));
+    num_recvs = is_process_gpu() ? lane_value(size, id)
+                                 : max(lane_value(size, id), num_recvs);
+  });
+  for (uint64_t idx = 0; idx < num_recvs; idx += sizeof(Buffer::data)) {
+    recv([=](Buffer *buffer, uint32_t id) {
+      uint64_t len = lane_value(size, id) - idx > sizeof(Buffer::data)
+                         ? sizeof(Buffer::data)
+                         : lane_value(size, id) - idx;
+      if (idx < lane_value(size, id))
+        inline_memcpy(reinterpret_cast<uint8_t *>(lane_value(dst, id)) + idx,
+                      buffer->data, len);
     });
-    for (uint64_t idx = 0; idx < max; idx += sizeof(Buffer::data)) {
-      recv([=](Buffer *buffer, uint32_t id) {
-        uint64_t len = size[id] - idx > sizeof(Buffer::data)
-                           ? sizeof(Buffer::data)
-                           : size[id] - idx;
-        if (idx < size[id])
-          inline_memcpy(dst[id] + idx, buffer->data, len);
-      });
-    }
-    return;
   }
+  return;
 }
 
 /// Attempts to open a port to use as the client. The client can only open a

diff  --git a/libc/src/__support/RPC/rpc_util.h b/libc/src/__support/RPC/rpc_util.h
index c6282e40c903e..b2b8cfeaa5c39 100644
--- a/libc/src/__support/RPC/rpc_util.h
+++ b/libc/src/__support/RPC/rpc_util.h
@@ -54,6 +54,21 @@ template <typename V, typename A> LIBC_INLINE V align_up(V val, A align) {
   return ((val + V(align) - 1) / V(align)) * V(align);
 }
 
+/// Utility to provide a unified interface between the CPU and GPU's memory
+/// model. On the GPU stack variables are always private to a lane so we can
+/// simply use the variable passed in. On the CPU we need to allocate enough
+/// space for the whole lane and index into it.
+template <typename V> LIBC_INLINE V &lane_value(V *val, uint32_t id) {
+  if constexpr (is_process_gpu())
+    return *val;
+  return val[id];
+}
+
+/// Helper to get the maximum value.
+template <typename T> LIBC_INLINE const T &max(const T &x, const T &y) {
+  return x < y ? y : x;
+}
+
 } // namespace rpc
 } // namespace __llvm_libc
 

diff  --git a/libc/test/integration/startup/gpu/CMakeLists.txt b/libc/test/integration/startup/gpu/CMakeLists.txt
index 12ff49c46dbe4..02018f9c58930 100644
--- a/libc/test/integration/startup/gpu/CMakeLists.txt
+++ b/libc/test/integration/startup/gpu/CMakeLists.txt
@@ -43,3 +43,12 @@ add_integration_test(
   SRCS
    rpc_interface_test.cpp
 )
+
+add_integration_test(
+  startup_rpc_stream_test
+  SUITE libc-startup-tests
+  SRCS
+   rpc_stream_test.cpp
+  LOADER_ARGS
+   --threads-x 32
+)

diff  --git a/libc/test/integration/startup/gpu/rpc_stream_test.cpp b/libc/test/integration/startup/gpu/rpc_stream_test.cpp
new file mode 100644
index 0000000000000..877ded34b1aaa
--- /dev/null
+++ b/libc/test/integration/startup/gpu/rpc_stream_test.cpp
@@ -0,0 +1,50 @@
+//===-- Loader test to check the RPC streaming interface with the loader --===//
+//
+// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
+// See https://llvm.org/LICENSE.txt for license information.
+// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
+//
+//===----------------------------------------------------------------------===//
+
+#include "src/__support/GPU/utils.h"
+#include "src/__support/RPC/rpc_client.h"
+#include "src/__support/integer_to_string.h"
+#include "src/string/memory_utils/memcmp_implementations.h"
+#include "src/string/memory_utils/memcpy_implementations.h"
+#include "src/string/string_utils.h"
+#include "test/IntegrationTest/test.h"
+
+extern "C" void *malloc(uint64_t);
+
+using namespace __llvm_libc;
+
+static void test_stream() {
+  const char str[] = "ABCDEFGHIJKLMNOPQRSTUVWXYabcdefghijklmnopqrstuvwxy"
+                     "ABCDEFGHIJKLMNOPQRSTUVWXYabcdefghijklmnopqrstuvwxy"
+                     "ABCDEFGHIJKLMNOPQRSTUVWXYabcdefghijklmnopqrstuvwxy"
+                     "ABCDEFGHIJKLMNOPQRSTUVWXYabcdefghijklmnopqrstuvwxy"
+                     "ABCDEFGHIJKLMNOPQRSTUVWXYabcdefghijklmnopqrstuvwxy";
+  uint64_t send_size = sizeof(str);
+  void *send_ptr = malloc(send_size);
+  void *recv_ptr;
+  uint64_t recv_size;
+
+  inline_memcpy(send_ptr, str, send_size);
+  ASSERT_TRUE(inline_memcmp(send_ptr, str, send_size) == 0 && "Data mismatch");
+  rpc::Client::Port port = rpc::client.open<rpc::TEST_STREAM>();
+  port.send_n(send_ptr, send_size);
+  port.recv_n(&recv_ptr, &recv_size,
+              [](uint64_t size) { return malloc(size); });
+  port.close();
+  ASSERT_TRUE(inline_memcmp(recv_ptr, str, recv_size) == 0 && "Data mismatch");
+  ASSERT_TRUE(recv_size == send_size && "Data size mismatch");
+
+  free(send_ptr);
+  free(recv_ptr);
+}
+
+TEST_MAIN(int argc, char **argv, char **envp) {
+  test_stream();
+
+  return 0;
+}

diff  --git a/libc/utils/gpu/loader/Server.h b/libc/utils/gpu/loader/Server.h
index 2419de53a5cd2..2e9fdfd6e69b1 100644
--- a/libc/utils/gpu/loader/Server.h
+++ b/libc/utils/gpu/loader/Server.h
@@ -32,17 +32,13 @@ void handle_server() {
 
     switch (port->get_opcode()) {
     case rpc::Opcode::PRINT_TO_STDERR: {
-      uint64_t str_size[rpc::MAX_LANE_SIZE] = {0};
-      char *strs[rpc::MAX_LANE_SIZE] = {nullptr};
-      port->recv_n([&](uint64_t size, uint32_t id) {
-        str_size[id] = size;
-        strs[id] = new char[size];
-        return strs[id];
-      });
+      uint64_t sizes[rpc::MAX_LANE_SIZE] = {0};
+      void *strs[rpc::MAX_LANE_SIZE] = {nullptr};
+      port->recv_n(strs, sizes, [&](uint64_t size) { return new char[size]; });
       for (uint64_t i = 0; i < rpc::MAX_LANE_SIZE; ++i) {
         if (strs[i]) {
-          fwrite(strs[i], str_size[i], 1, stderr);
-          delete[] strs[i];
+          fwrite(strs[i], sizes[i], 1, stderr);
+          delete[] reinterpret_cast<uint8_t *>(strs[i]);
         }
       }
       break;
@@ -78,6 +74,17 @@ void handle_server() {
             [&](rpc::Buffer *buffer) { buffer->data[0] = cnt = cnt + 1; });
       break;
     }
+    case rpc::Opcode::TEST_STREAM: {
+      uint64_t sizes[rpc::MAX_LANE_SIZE] = {0};
+      void *dst[rpc::MAX_LANE_SIZE] = {nullptr};
+      port->recv_n(dst, sizes, [](uint64_t size) { return new char[size]; });
+      port->send_n(dst, sizes);
+      for (uint64_t i = 0; i < rpc::MAX_LANE_SIZE; ++i) {
+        if (dst[i])
+          delete[] reinterpret_cast<uint8_t *>(dst[i]);
+      }
+      break;
+    }
     default:
       port->recv([](rpc::Buffer *buffer) {});
     }


        


More information about the libc-commits mailing list