[libc-commits] [libc] 1a5d3b6 - [libc] Scan the ports more fairly in the RPC server (#66680)

via libc-commits libc-commits at lists.llvm.org
Tue Sep 26 14:09:52 PDT 2023


Author: Joseph Huber
Date: 2023-09-26T16:09:48-05:00
New Revision: 1a5d3b6cda2c56a39bbe2a529db4d3ac3d5ffa0f

URL: https://github.com/llvm/llvm-project/commit/1a5d3b6cda2c56a39bbe2a529db4d3ac3d5ffa0f
DIFF: https://github.com/llvm/llvm-project/commit/1a5d3b6cda2c56a39bbe2a529db4d3ac3d5ffa0f.diff

LOG: [libc] Scan the ports more fairly in the RPC server (#66680)

Summary:
Currently, we use the RPC server to respond to different ports which
each contain a request from some client thread wishing to do work on the
server. This scan starts at zero and continues until its checked all
ports at which point it resets. If we find an active port, we service it
and then restart the search.

This is bad for two reasons. First, it means that we will always bias
the lower ports. If a thread grabs a high port it will be stuck for a
very long time until all the other work is done. Second, it means that
the `handle_server` function can technically run indefinitely as long as
the client is always pushing new work. Because the OpenMP implementation
uses the user thread to service the kernel, this means that it could be
stalled with another asyncrhonous device's kernels.

This patch addresses this by making the server restart at the next port
over. This means we will always do a full scan of the ports before
quitting.

Added: 
    

Modified: 
    libc/src/__support/RPC/rpc.h
    libc/utils/gpu/server/rpc_server.cpp

Removed: 
    


################################################################################
diff  --git a/libc/src/__support/RPC/rpc.h b/libc/src/__support/RPC/rpc.h
index 189da8671461c81..88c62dcdc340f06 100644
--- a/libc/src/__support/RPC/rpc.h
+++ b/libc/src/__support/RPC/rpc.h
@@ -318,6 +318,8 @@ template <bool T, typename S> struct Port {
     return process.packet[index].header.opcode;
   }
 
+  LIBC_INLINE uint16_t get_index() const { return index; }
+
   LIBC_INLINE void close() {
     // The server is passive, if it own the buffer when it closes we need to
     // give ownership back to the client.
@@ -367,7 +369,7 @@ template <uint32_t lane_size> struct Server {
       : process(port_count, buffer) {}
 
   using Port = rpc::Port<true, Packet<lane_size>>;
-  LIBC_INLINE cpp::optional<Port> try_open();
+  LIBC_INLINE cpp::optional<Port> try_open(uint32_t start = 0);
   LIBC_INLINE Port open();
 
   LIBC_INLINE static uint64_t allocation_size(uint32_t port_count) {
@@ -547,9 +549,9 @@ template <uint16_t opcode> LIBC_INLINE Client::Port Client::open() {
 template <uint32_t lane_size>
 [[clang::convergent]] LIBC_INLINE
     cpp::optional<typename Server<lane_size>::Port>
-    Server<lane_size>::try_open() {
+    Server<lane_size>::try_open(uint32_t start) {
   // Perform a naive linear scan for a port that has a pending request.
-  for (uint32_t index = 0; index < process.port_count; ++index) {
+  for (uint32_t index = start; index < process.port_count; ++index) {
     uint64_t lane_mask = gpu::get_lane_mask();
     uint32_t in = process.load_inbox(lane_mask, index);
     uint32_t out = process.load_outbox(lane_mask, index);

diff  --git a/libc/utils/gpu/server/rpc_server.cpp b/libc/utils/gpu/server/rpc_server.cpp
index 6395a808ca98b00..1c1c9f1ae9e6b5d 100644
--- a/libc/utils/gpu/server/rpc_server.cpp
+++ b/libc/utils/gpu/server/rpc_server.cpp
@@ -36,11 +36,12 @@ struct Server {
 
   rpc_status_t handle_server(
       const std::unordered_map<rpc_opcode_t, rpc_opcode_callback_ty> &callbacks,
-      const std::unordered_map<rpc_opcode_t, void *> &callback_data) {
+      const std::unordered_map<rpc_opcode_t, void *> &callback_data,
+      uint32_t &index) {
     rpc_status_t ret = RPC_STATUS_SUCCESS;
     std::visit(
         [&](auto &server) {
-          ret = handle_server(*server, callbacks, callback_data);
+          ret = handle_server(*server, callbacks, callback_data, index);
         },
         server);
     return ret;
@@ -51,8 +52,9 @@ struct Server {
   rpc_status_t handle_server(
       rpc::Server<lane_size> &server,
       const std::unordered_map<rpc_opcode_t, rpc_opcode_callback_ty> &callbacks,
-      const std::unordered_map<rpc_opcode_t, void *> &callback_data) {
-    auto port = server.try_open();
+      const std::unordered_map<rpc_opcode_t, void *> &callback_data,
+      uint32_t &index) {
+    auto port = server.try_open(index);
     if (!port)
       return RPC_STATUS_SUCCESS;
 
@@ -203,6 +205,9 @@ struct Server {
       (handler->second)(port_ref, data);
     }
     }
+
+    // Increment the index so we start the scan after this port.
+    index = port->get_index() + 1;
     port->close();
     return RPC_STATUS_CONTINUE;
   }
@@ -333,10 +338,11 @@ rpc_status_t rpc_handle_server(uint32_t device_id) {
   if (!state->devices[device_id])
     return RPC_STATUS_ERROR;
 
+  uint32_t index = 0;
   for (;;) {
     auto &device = *state->devices[device_id];
-    rpc_status_t status =
-        device.server.handle_server(device.callbacks, device.callback_data);
+    rpc_status_t status = device.server.handle_server(
+        device.callbacks, device.callback_data, index);
     if (status != RPC_STATUS_CONTINUE)
       return status;
   }


        


More information about the libc-commits mailing list