[llvm] Raw socket stream (PR #84710)

Connor Sughrue via llvm-commits llvm-commits at lists.llvm.org
Sun Mar 10 20:40:28 PDT 2024


https://github.com/cpsughrue created https://github.com/llvm/llvm-project/pull/84710

None

>From 4569e17123371fa348bbb7f314a1fc898623d249 Mon Sep 17 00:00:00 2001
From: cpsughrue <cpsughrue at gmail.com>
Date: Wed, 14 Feb 2024 04:08:33 -0500
Subject: [PATCH 1/7] Improvements to raw_socket_stream functionality and
 documentation

---
 llvm/include/llvm/Support/raw_socket_stream.h |  72 +++++-
 llvm/lib/Support/raw_socket_stream.cpp        | 210 ++++++++++++------
 .../Support/raw_socket_stream_test.cpp        |   7 +-
 3 files changed, 218 insertions(+), 71 deletions(-)

diff --git a/llvm/include/llvm/Support/raw_socket_stream.h b/llvm/include/llvm/Support/raw_socket_stream.h
index c219792d82465d..e41aed755409da 100644
--- a/llvm/include/llvm/Support/raw_socket_stream.h
+++ b/llvm/include/llvm/Support/raw_socket_stream.h
@@ -17,12 +17,18 @@
 #include "llvm/Support/Threading.h"
 #include "llvm/Support/raw_ostream.h"
 
+#include <atomic>
+#include <chrono>
+
 namespace llvm {
 
 class raw_socket_stream;
 
-// Make sure that calls to WSAStartup and WSACleanup are balanced.
 #ifdef _WIN32
+/// @brief Ensures proper initialization and cleanup of winsock resources
+///
+/// @details
+/// Make sure that calls to WSAStartup and WSACleanup are balanced.
 class WSABalancer {
 public:
   WSABalancer();
@@ -30,22 +36,73 @@ class WSABalancer {
 };
 #endif // _WIN32
 
+/// @class ListeningSocket
+/// @brief Manages a passive (i.e., listening) UNIX domain socket
+///
+/// The ListeningSocket class encapsulates a UNIX domain socket that can listen
+/// and accept incoming connections. ListeningSocket is portable and supports
+/// Windows builds begining with Insider Build 17063. ListeningSocket is
+/// designed for server-side operations, working alongside raw_socket_streams
+/// that function as client connections.
+///
+/// Usage example:
+/// @code{.cpp}
+/// std::string Path = "/path/to/socket"
+/// Expected<ListeningSocket> S = ListeningSocket::createListeningSocket(Path);
+///
+/// if (listeningSocket) {
+///     auto connection = S->accept();
+///     if (connection) {
+///         // Use the accepted raw_socket_stream for communication.
+///     }
+/// }
+/// @endcode
+///
 class ListeningSocket {
-  int FD;
+  std::atomic<int> FD;
   std::string SocketPath;
   ListeningSocket(int SocketFD, StringRef SocketPath);
+
 #ifdef _WIN32
   WSABalancer _;
 #endif // _WIN32
 
 public:
-  static Expected<ListeningSocket> createUnix(
+  ~ListeningSocket();
+  ListeningSocket(ListeningSocket &&LS);
+  ListeningSocket(const ListeningSocket &LS) = delete;
+  ListeningSocket &operator=(const ListeningSocket &) = delete;
+
+  /// Closes the socket's FD and unlinks the socket file from the file system.
+  /// The method is idempotent
+  void shutdown();
+
+  /// Accepts an incoming connection on the listening socket. This method can
+  /// optionally either block until a connection is available or timeout after a
+  /// specified amount of time has passed. By default the method will block
+  /// until the socket has recieved a connection
+  ///
+  /// @param Timeout An optional timeout duration in microseconds
+  ///
+  Expected<std::unique_ptr<raw_socket_stream>>
+  accept(std::optional<std::chrono::microseconds> Timeout = std::nullopt);
+
+  /// Creates a listening socket bound to the specified file system path.
+  /// Handles the socket creation, binding, and immediately starts listening for
+  /// incoming connections.
+  ///
+  /// @param SocketPath The file system path where the socket will be created
+  /// @param MaxBacklog The max number of connections in a socket's backlog
+  ///
+  static Expected<ListeningSocket> createListeningSocket(
       StringRef SocketPath,
       int MaxBacklog = llvm::hardware_concurrency().compute_thread_count());
-  Expected<std::unique_ptr<raw_socket_stream>> accept();
-  ListeningSocket(ListeningSocket &&LS);
-  ~ListeningSocket();
 };
+
+//===----------------------------------------------------------------------===//
+//  raw_socket_stream
+//===----------------------------------------------------------------------===//
+
 class raw_socket_stream : public raw_fd_stream {
   uint64_t current_pos() const override { return 0; }
 #ifdef _WIN32
@@ -53,11 +110,12 @@ class raw_socket_stream : public raw_fd_stream {
 #endif // _WIN32
 
 public:
+  // TODO: Should probably be private
   raw_socket_stream(int SocketFD);
   /// Create a \p raw_socket_stream connected to the Unix domain socket at \p
   /// SocketPath.
   static Expected<std::unique_ptr<raw_socket_stream>>
-  createConnectedUnix(StringRef SocketPath);
+  createConnectedSocket(StringRef SocketPath);
   ~raw_socket_stream();
 };
 
diff --git a/llvm/lib/Support/raw_socket_stream.cpp b/llvm/lib/Support/raw_socket_stream.cpp
index afb0ed11b2c24e..46385c7ea29b87 100644
--- a/llvm/lib/Support/raw_socket_stream.cpp
+++ b/llvm/lib/Support/raw_socket_stream.cpp
@@ -14,6 +14,9 @@
 #include "llvm/Support/raw_socket_stream.h"
 #include "llvm/Config/config.h"
 #include "llvm/Support/Error.h"
+#include "llvm/Support/FileSystem.h"
+
+#include <atomic>
 
 #ifndef _WIN32
 #include <sys/socket.h>
@@ -45,7 +48,6 @@ WSABalancer::WSABalancer() {
 }
 
 WSABalancer::~WSABalancer() { WSACleanup(); }
-
 #endif // _WIN32
 
 static std::error_code getLastSocketErrorCode() {
@@ -56,117 +58,201 @@ static std::error_code getLastSocketErrorCode() {
 #endif
 }
 
+static void closeFD(int FD) {
+#ifdef _WIN32
+  // on windows ::close is a deprecated alias for _close
+  _close(FD);
+#else
+  ::close(FD);
+#endif
+}
+
+static void unlinkFile(StringRef Path) {
+#ifdef _WIN32
+  // on windows ::unlink is a deprecated alias for _unlink
+  _unlink(Path.str().c_str());
+#else
+  ::unlink(Path.str().c_str());
+#endif
+}
+
+static sockaddr_un setSocketAddr(StringRef SocketPath) {
+  struct sockaddr_un Addr;
+  memset(&Addr, 0, sizeof(Addr));
+  Addr.sun_family = AF_UNIX;
+  strncpy(Addr.sun_path, SocketPath.str().c_str(), sizeof(Addr.sun_path) - 1);
+  return Addr;
+}
+
+static Expected<int> getSocketFD(StringRef SocketPath) {
+#ifdef _WIN32
+  SOCKET MaybeSocket = socket(AF_UNIX, SOCK_STREAM, 0);
+  if (MaybeSocket == INVALID_SOCKET) {
+#else
+  int MaybeSocket = socket(AF_UNIX, SOCK_STREAM, 0);
+  if (MaybeSocket == -1) {
+#endif // _WIN32
+    return llvm::make_error<StringError>(getLastSocketErrorCode(),
+                                         "Create socket failed");
+  }
+
+  struct sockaddr_un Addr = setSocketAddr(SocketPath);
+  if (::connect(MaybeSocket, (struct sockaddr *)&Addr, sizeof(Addr)) == -1)
+    return llvm::make_error<StringError>(getLastSocketErrorCode(),
+                                         "Connect socket failed");
+
+#ifdef _WIN32
+  return _open_osfhandle(MaybeWinsocket, 0);
+#else
+  return MaybeSocket;
+#endif // _WIN32
+}
+
 ListeningSocket::ListeningSocket(int SocketFD, StringRef SocketPath)
     : FD(SocketFD), SocketPath(SocketPath) {}
 
 ListeningSocket::ListeningSocket(ListeningSocket &&LS)
-    : FD(LS.FD), SocketPath(LS.SocketPath) {
+    : FD(LS.FD.load()), SocketPath(LS.SocketPath) {
+
+  LS.SocketPath.clear();
   LS.FD = -1;
 }
 
-Expected<ListeningSocket> ListeningSocket::createUnix(StringRef SocketPath,
-                                                      int MaxBacklog) {
+Expected<ListeningSocket>
+ListeningSocket::createListeningSocket(StringRef SocketPath, int MaxBacklog) {
+
+  // Handle instances where the target socket address already exists and
+  // differentiate between a preexisting file with and without a bound socket
+  //
+  // ::bind will return std::errc:address_in_use if a file at the socket address
+  // already exists (e.g., the file was not properly unlinked due to a crash)
+  // even if another socket has not yet binded to that address
+  if (llvm::sys::fs::exists(SocketPath)) {
+    Expected<int> MaybeFD = getSocketFD(SocketPath);
+    if (!MaybeFD) {
+
+      // Regardless of the error, notify the caller that a file already exists
+      // at the desired socket address. The file must be removed before ::bind
+      // can use the socket address
+      consumeError(MaybeFD.takeError());
+      return llvm::make_error<StringError>(
+          std::make_error_code(std::errc::file_exists),
+          "Socket address unavailable");
+    }
+    closeFD(std::move(*MaybeFD));
+
+    // Notify caller that the provided socket address already has a bound socket
+    return llvm::make_error<StringError>(
+        std::make_error_code(std::errc::address_in_use),
+        "Socket address unavailable");
+  }
 
 #ifdef _WIN32
   WSABalancer _;
-  SOCKET MaybeWinsocket = socket(AF_UNIX, SOCK_STREAM, 0);
-  if (MaybeWinsocket == INVALID_SOCKET) {
+  SOCKET MaybeSocket = socket(AF_UNIX, SOCK_STREAM, 0);
+  if (MaybeSocket == INVALID_SOCKET) {
 #else
-  int MaybeWinsocket = socket(AF_UNIX, SOCK_STREAM, 0);
-  if (MaybeWinsocket == -1) {
+  int MaybeSocket = socket(AF_UNIX, SOCK_STREAM, 0);
+  if (MaybeSocket == -1) {
 #endif
     return llvm::make_error<StringError>(getLastSocketErrorCode(),
                                          "socket create failed");
   }
 
-  struct sockaddr_un Addr;
-  memset(&Addr, 0, sizeof(Addr));
-  Addr.sun_family = AF_UNIX;
-  strncpy(Addr.sun_path, SocketPath.str().c_str(), sizeof(Addr.sun_path) - 1);
-
-  if (bind(MaybeWinsocket, (struct sockaddr *)&Addr, sizeof(Addr)) == -1) {
-    std::error_code Err = getLastSocketErrorCode();
-    if (Err == std::errc::address_in_use)
-      ::close(MaybeWinsocket);
-    return llvm::make_error<StringError>(Err, "Bind error");
+  struct sockaddr_un Addr = setSocketAddr(SocketPath);
+  if (::bind(MaybeSocket, (struct sockaddr *)&Addr, sizeof(Addr)) == -1) {
+    // Grab error code from call to ::bind before calling ::close
+    std::error_code EC = getLastSocketErrorCode();
+    ::close(MaybeSocket);
+    return llvm::make_error<StringError>(EC, "Bind error");
   }
-  if (listen(MaybeWinsocket, MaxBacklog) == -1) {
+
+  // Mark socket as passive so incoming connections can be accepted
+  if (::listen(MaybeSocket, MaxBacklog) == -1)
     return llvm::make_error<StringError>(getLastSocketErrorCode(),
                                          "Listen error");
-  }
-  int UnixSocket;
+
+  int Socket;
 #ifdef _WIN32
-  UnixSocket = _open_osfhandle(MaybeWinsocket, 0);
+  Socket = _open_osfhandle(MaybeWinsocket, 0);
 #else
-  UnixSocket = MaybeWinsocket;
+  Socket = MaybeSocket;
 #endif // _WIN32
-  return ListeningSocket{UnixSocket, SocketPath};
+  return ListeningSocket{Socket, SocketPath};
 }
 
-Expected<std::unique_ptr<raw_socket_stream>> ListeningSocket::accept() {
+Expected<std::unique_ptr<raw_socket_stream>>
+ListeningSocket::accept(std::optional<std::chrono::microseconds> Timeout) {
+
+  int SelectStatus;
   int AcceptFD;
+
 #ifdef _WIN32
   SOCKET WinServerSock = _get_osfhandle(FD);
-  SOCKET WinAcceptSock = ::accept(WinServerSock, NULL, NULL);
-  AcceptFD = _open_osfhandle(WinAcceptSock, 0);
+#endif
+
+  fd_set Readfds;
+  if (Timeout.has_value()) {
+    timeval TV = {0, Timeout.value().count()};
+    FD_ZERO(&Readfds);
+#ifdef _WIN32
+    FD_SET(WinServerSock, &Readfds);
+#else
+    FD_SET(FD, &Readfds);
+#endif
+    SelectStatus = ::select(FD + 1, &Readfds, NULL, NULL, &TV);
+  } else {
+    SelectStatus = 1;
+  }
+
+  if (SelectStatus == -1)
+    return llvm::make_error<StringError>(getLastSocketErrorCode(),
+                                         "Select failed");
+
+  if (SelectStatus) {
+#ifdef _WIN32
+    SOCKET WinAcceptSock = ::accept(WinServerSock, NULL, NULL);
+    AcceptFD = _open_osfhandle(WinAcceptSock, 0);
 #else
-  AcceptFD = ::accept(FD, NULL, NULL);
-#endif //_WIN32
+    AcceptFD = ::accept(FD, NULL, NULL);
+#endif
+  } else
+    return llvm::make_error<StringError>(
+        std::make_error_code(std::errc::timed_out), "Accept timed out");
+
   if (AcceptFD == -1)
     return llvm::make_error<StringError>(getLastSocketErrorCode(),
                                          "Accept failed");
+
   return std::make_unique<raw_socket_stream>(AcceptFD);
 }
 
-ListeningSocket::~ListeningSocket() {
+void ListeningSocket::shutdown() {
   if (FD == -1)
     return;
-  ::close(FD);
-  unlink(SocketPath.c_str());
+  closeFD(FD);
+  unlinkFile(SocketPath);
+  FD = -1;
 }
 
-static Expected<int> GetSocketFD(StringRef SocketPath) {
-#ifdef _WIN32
-  SOCKET MaybeWinsocket = socket(AF_UNIX, SOCK_STREAM, 0);
-  if (MaybeWinsocket == INVALID_SOCKET) {
-#else
-  int MaybeWinsocket = socket(AF_UNIX, SOCK_STREAM, 0);
-  if (MaybeWinsocket == -1) {
-#endif // _WIN32
-    return llvm::make_error<StringError>(getLastSocketErrorCode(),
-                                         "Create socket failed");
-  }
+ListeningSocket::~ListeningSocket() { shutdown(); }
 
-  struct sockaddr_un Addr;
-  memset(&Addr, 0, sizeof(Addr));
-  Addr.sun_family = AF_UNIX;
-  strncpy(Addr.sun_path, SocketPath.str().c_str(), sizeof(Addr.sun_path) - 1);
-
-  int status = connect(MaybeWinsocket, (struct sockaddr *)&Addr, sizeof(Addr));
-  if (status == -1) {
-    return llvm::make_error<StringError>(getLastSocketErrorCode(),
-                                         "Connect socket failed");
-  }
-#ifdef _WIN32
-  return _open_osfhandle(MaybeWinsocket, 0);
-#else
-  return MaybeWinsocket;
-#endif // _WIN32
-}
+//===----------------------------------------------------------------------===//
+//  raw_socket_stream
+//===----------------------------------------------------------------------===//
 
 raw_socket_stream::raw_socket_stream(int SocketFD)
     : raw_fd_stream(SocketFD, true) {}
 
 Expected<std::unique_ptr<raw_socket_stream>>
-raw_socket_stream::createConnectedUnix(StringRef SocketPath) {
+raw_socket_stream::createConnectedSocket(StringRef SocketPath) {
 #ifdef _WIN32
   WSABalancer _;
 #endif // _WIN32
-  Expected<int> FD = GetSocketFD(SocketPath);
+  Expected<int> FD = getSocketFD(SocketPath);
   if (!FD)
     return FD.takeError();
   return std::make_unique<raw_socket_stream>(*FD);
 }
 
 raw_socket_stream::~raw_socket_stream() {}
-
diff --git a/llvm/unittests/Support/raw_socket_stream_test.cpp b/llvm/unittests/Support/raw_socket_stream_test.cpp
index 6903862e540315..e6dafe5e062d0b 100644
--- a/llvm/unittests/Support/raw_socket_stream_test.cpp
+++ b/llvm/unittests/Support/raw_socket_stream_test.cpp
@@ -35,16 +35,19 @@ TEST(raw_socket_streamTest, CLIENT_TO_SERVER_AND_SERVER_TO_CLIENT) {
   llvm::sys::fs::createUniquePath("test_raw_socket_stream.sock", SocketPath,
                                   true);
 
+  // Make sure socket file does not exist. May still be there from the last test
+  std::remove(SocketPath.c_str());
+
   char Bytes[8];
 
   Expected<ListeningSocket> MaybeServerListener =
-      ListeningSocket::createUnix(SocketPath);
+      ListeningSocket::createListeningSocket(SocketPath);
   ASSERT_THAT_EXPECTED(MaybeServerListener, llvm::Succeeded());
 
   ListeningSocket ServerListener = std::move(*MaybeServerListener);
 
   Expected<std::unique_ptr<raw_socket_stream>> MaybeClient =
-      raw_socket_stream::createConnectedUnix(SocketPath);
+      raw_socket_stream::createConnectedSocket(SocketPath);
   ASSERT_THAT_EXPECTED(MaybeClient, llvm::Succeeded());
 
   raw_socket_stream &Client = **MaybeClient;

>From 16d3f23ec97ca7120f43fcc10fe6e2262a95cbcd Mon Sep 17 00:00:00 2001
From: cpsughrue <cpsughrue at gmail.com>
Date: Wed, 6 Mar 2024 00:56:00 -0500
Subject: [PATCH 2/7] Address feedback

  - use ::close and ::unlink instead of _close and _unlink on Windows
  - fix name of raw_socket_stream functions to specify Unix
  - use \ instead of @ in doxygen comments
  - write unittest to make sure timeout works correctly
---
 llvm/include/llvm/Support/raw_socket_stream.h | 38 ++++++------
 llvm/lib/Support/raw_socket_stream.cpp        | 58 ++++++-------------
 .../Support/raw_socket_stream_test.cpp        | 43 +++++++++++---
 3 files changed, 73 insertions(+), 66 deletions(-)

diff --git a/llvm/include/llvm/Support/raw_socket_stream.h b/llvm/include/llvm/Support/raw_socket_stream.h
index e41aed755409da..01f02bb8d6397d 100644
--- a/llvm/include/llvm/Support/raw_socket_stream.h
+++ b/llvm/include/llvm/Support/raw_socket_stream.h
@@ -25,9 +25,8 @@ namespace llvm {
 class raw_socket_stream;
 
 #ifdef _WIN32
-/// @brief Ensures proper initialization and cleanup of winsock resources
+/// \brief Ensures proper initialization and cleanup of winsock resources
 ///
-/// @details
 /// Make sure that calls to WSAStartup and WSACleanup are balanced.
 class WSABalancer {
 public:
@@ -36,27 +35,27 @@ class WSABalancer {
 };
 #endif // _WIN32
 
-/// @class ListeningSocket
-/// @brief Manages a passive (i.e., listening) UNIX domain socket
+/// \class ListeningSocket
+/// \brief Manages a passive (i.e., listening) UNIX domain socket
 ///
 /// The ListeningSocket class encapsulates a UNIX domain socket that can listen
 /// and accept incoming connections. ListeningSocket is portable and supports
 /// Windows builds begining with Insider Build 17063. ListeningSocket is
-/// designed for server-side operations, working alongside raw_socket_streams
+/// designed for server-side operations, working alongside \p raw_socket_streams
 /// that function as client connections.
 ///
 /// Usage example:
-/// @code{.cpp}
+/// \code{.cpp}
 /// std::string Path = "/path/to/socket"
 /// Expected<ListeningSocket> S = ListeningSocket::createListeningSocket(Path);
 ///
-/// if (listeningSocket) {
-///     auto connection = S->accept();
-///     if (connection) {
-///         // Use the accepted raw_socket_stream for communication.
-///     }
+/// if (S) {
+///   Expected<std::unique_ptr<raw_socket_stream>> connection = S->accept();
+///   if (connection) {
+///     // Use the accepted raw_socket_stream for communication.
+///   }
 /// }
-/// @endcode
+/// \endcode
 ///
 class ListeningSocket {
   std::atomic<int> FD;
@@ -74,7 +73,7 @@ class ListeningSocket {
   ListeningSocket &operator=(const ListeningSocket &) = delete;
 
   /// Closes the socket's FD and unlinks the socket file from the file system.
-  /// The method is idempotent
+  /// The method is thread and signal safe
   void shutdown();
 
   /// Accepts an incoming connection on the listening socket. This method can
@@ -82,7 +81,7 @@ class ListeningSocket {
   /// specified amount of time has passed. By default the method will block
   /// until the socket has recieved a connection
   ///
-  /// @param Timeout An optional timeout duration in microseconds
+  /// \param Timeout An optional timeout duration in microseconds
   ///
   Expected<std::unique_ptr<raw_socket_stream>>
   accept(std::optional<std::chrono::microseconds> Timeout = std::nullopt);
@@ -91,10 +90,10 @@ class ListeningSocket {
   /// Handles the socket creation, binding, and immediately starts listening for
   /// incoming connections.
   ///
-  /// @param SocketPath The file system path where the socket will be created
-  /// @param MaxBacklog The max number of connections in a socket's backlog
+  /// \param SocketPath The file system path where the socket will be created
+  /// \param MaxBacklog The max number of connections in a socket's backlog
   ///
-  static Expected<ListeningSocket> createListeningSocket(
+  static Expected<ListeningSocket> createListeningUnixSocket(
       StringRef SocketPath,
       int MaxBacklog = llvm::hardware_concurrency().compute_thread_count());
 };
@@ -110,12 +109,11 @@ class raw_socket_stream : public raw_fd_stream {
 #endif // _WIN32
 
 public:
-  // TODO: Should probably be private
   raw_socket_stream(int SocketFD);
-  /// Create a \p raw_socket_stream connected to the Unix domain socket at \p
+  /// Create a \p raw_socket_stream connected to the UNIX domain socket at \p
   /// SocketPath.
   static Expected<std::unique_ptr<raw_socket_stream>>
-  createConnectedSocket(StringRef SocketPath);
+  createConnectedUnixSocket(StringRef SocketPath);
   ~raw_socket_stream();
 };
 
diff --git a/llvm/lib/Support/raw_socket_stream.cpp b/llvm/lib/Support/raw_socket_stream.cpp
index 46385c7ea29b87..fafc880a7105b7 100644
--- a/llvm/lib/Support/raw_socket_stream.cpp
+++ b/llvm/lib/Support/raw_socket_stream.cpp
@@ -58,24 +58,6 @@ static std::error_code getLastSocketErrorCode() {
 #endif
 }
 
-static void closeFD(int FD) {
-#ifdef _WIN32
-  // on windows ::close is a deprecated alias for _close
-  _close(FD);
-#else
-  ::close(FD);
-#endif
-}
-
-static void unlinkFile(StringRef Path) {
-#ifdef _WIN32
-  // on windows ::unlink is a deprecated alias for _unlink
-  _unlink(Path.str().c_str());
-#else
-  ::unlink(Path.str().c_str());
-#endif
-}
-
 static sockaddr_un setSocketAddr(StringRef SocketPath) {
   struct sockaddr_un Addr;
   memset(&Addr, 0, sizeof(Addr));
@@ -119,7 +101,8 @@ ListeningSocket::ListeningSocket(ListeningSocket &&LS)
 }
 
 Expected<ListeningSocket>
-ListeningSocket::createListeningSocket(StringRef SocketPath, int MaxBacklog) {
+ListeningSocket::createListeningUnixSocket(StringRef SocketPath,
+                                           int MaxBacklog) {
 
   // Handle instances where the target socket address already exists and
   // differentiate between a preexisting file with and without a bound socket
@@ -132,14 +115,14 @@ ListeningSocket::createListeningSocket(StringRef SocketPath, int MaxBacklog) {
     if (!MaybeFD) {
 
       // Regardless of the error, notify the caller that a file already exists
-      // at the desired socket address. The file must be removed before ::bind
-      // can use the socket address
+      // at the desired socket address and that there is no bound socket at that
+      // address. The file must be removed before ::bind can use the address
       consumeError(MaybeFD.takeError());
       return llvm::make_error<StringError>(
           std::make_error_code(std::errc::file_exists),
           "Socket address unavailable");
     }
-    closeFD(std::move(*MaybeFD));
+    ::close(std::move(*MaybeFD));
 
     // Notify caller that the provided socket address already has a bound socket
     return llvm::make_error<StringError>(
@@ -184,31 +167,28 @@ ListeningSocket::createListeningSocket(StringRef SocketPath, int MaxBacklog) {
 Expected<std::unique_ptr<raw_socket_stream>>
 ListeningSocket::accept(std::optional<std::chrono::microseconds> Timeout) {
 
-  int SelectStatus;
-  int AcceptFD;
-
+  fd_set Readfds;
+  FD_ZERO(&Readfds);
 #ifdef _WIN32
   SOCKET WinServerSock = _get_osfhandle(FD);
+  FD_SET(WinServerSock, &Readfds);
+#else
+  FD_SET(FD, &Readfds);
 #endif
 
-  fd_set Readfds;
+  int SelectStatus;
   if (Timeout.has_value()) {
     timeval TV = {0, Timeout.value().count()};
-    FD_ZERO(&Readfds);
-#ifdef _WIN32
-    FD_SET(WinServerSock, &Readfds);
-#else
-    FD_SET(FD, &Readfds);
-#endif
     SelectStatus = ::select(FD + 1, &Readfds, NULL, NULL, &TV);
   } else {
-    SelectStatus = 1;
+    SelectStatus = ::select(FD + 1, &Readfds, NULL, NULL, NULL);
   }
 
   if (SelectStatus == -1)
     return llvm::make_error<StringError>(getLastSocketErrorCode(),
-                                         "Select failed");
+                                         "select failed");
 
+  int AcceptFD;
   if (SelectStatus) {
 #ifdef _WIN32
     SOCKET WinAcceptSock = ::accept(WinServerSock, NULL, NULL);
@@ -218,11 +198,11 @@ ListeningSocket::accept(std::optional<std::chrono::microseconds> Timeout) {
 #endif
   } else
     return llvm::make_error<StringError>(
-        std::make_error_code(std::errc::timed_out), "Accept timed out");
+        std::make_error_code(std::errc::timed_out), "accept timed out");
 
   if (AcceptFD == -1)
     return llvm::make_error<StringError>(getLastSocketErrorCode(),
-                                         "Accept failed");
+                                         "accept failed");
 
   return std::make_unique<raw_socket_stream>(AcceptFD);
 }
@@ -230,8 +210,8 @@ ListeningSocket::accept(std::optional<std::chrono::microseconds> Timeout) {
 void ListeningSocket::shutdown() {
   if (FD == -1)
     return;
-  closeFD(FD);
-  unlinkFile(SocketPath);
+  ::close(FD);
+  ::unlink(SocketPath.c_str());
   FD = -1;
 }
 
@@ -245,7 +225,7 @@ raw_socket_stream::raw_socket_stream(int SocketFD)
     : raw_fd_stream(SocketFD, true) {}
 
 Expected<std::unique_ptr<raw_socket_stream>>
-raw_socket_stream::createConnectedSocket(StringRef SocketPath) {
+raw_socket_stream::createConnectedUnixSocket(StringRef SocketPath) {
 #ifdef _WIN32
   WSABalancer _;
 #endif // _WIN32
diff --git a/llvm/unittests/Support/raw_socket_stream_test.cpp b/llvm/unittests/Support/raw_socket_stream_test.cpp
index e6dafe5e062d0b..4ef9204bfa8a9a 100644
--- a/llvm/unittests/Support/raw_socket_stream_test.cpp
+++ b/llvm/unittests/Support/raw_socket_stream_test.cpp
@@ -32,22 +32,19 @@ TEST(raw_socket_streamTest, CLIENT_TO_SERVER_AND_SERVER_TO_CLIENT) {
     GTEST_SKIP();
 
   SmallString<100> SocketPath;
-  llvm::sys::fs::createUniquePath("test_raw_socket_stream.sock", SocketPath,
-                                  true);
+  llvm::sys::fs::createUniquePath("client_server_comms.sock", SocketPath, true);
 
   // Make sure socket file does not exist. May still be there from the last test
   std::remove(SocketPath.c_str());
 
-  char Bytes[8];
-
   Expected<ListeningSocket> MaybeServerListener =
-      ListeningSocket::createListeningSocket(SocketPath);
+      ListeningSocket::createListeningUnixSocket(SocketPath);
   ASSERT_THAT_EXPECTED(MaybeServerListener, llvm::Succeeded());
 
   ListeningSocket ServerListener = std::move(*MaybeServerListener);
 
   Expected<std::unique_ptr<raw_socket_stream>> MaybeClient =
-      raw_socket_stream::createConnectedSocket(SocketPath);
+      raw_socket_stream::createConnectedUnixSocket(SocketPath);
   ASSERT_THAT_EXPECTED(MaybeClient, llvm::Succeeded());
 
   raw_socket_stream &Client = **MaybeClient;
@@ -61,6 +58,7 @@ TEST(raw_socket_streamTest, CLIENT_TO_SERVER_AND_SERVER_TO_CLIENT) {
   Client << "01234567";
   Client.flush();
 
+  char Bytes[8];
   ssize_t BytesRead = Server.read(Bytes, 8);
 
   std::string string(Bytes, 8);
@@ -68,4 +66,35 @@ TEST(raw_socket_streamTest, CLIENT_TO_SERVER_AND_SERVER_TO_CLIENT) {
   ASSERT_EQ(8, BytesRead);
   ASSERT_EQ("01234567", string);
 }
-} // namespace
\ No newline at end of file
+
+TEST(raw_socket_streamTest, TIMEOUT_PROVIDED) {
+  if (!hasUnixSocketSupport())
+    GTEST_SKIP();
+
+  SmallString<100> SocketPath;
+  llvm::sys::fs::createUniquePath("timout_provided.sock", SocketPath, true);
+
+  // Make sure socket file does not exist. May still be there from the last test
+  std::remove(SocketPath.c_str());
+
+  Expected<ListeningSocket> MaybeServerListener =
+      ListeningSocket::createListeningUnixSocket(SocketPath);
+  ASSERT_THAT_EXPECTED(MaybeServerListener, llvm::Succeeded());
+  ListeningSocket ServerListener = std::move(*MaybeServerListener);
+
+  std::chrono::seconds Timeout = std::chrono::seconds(5);
+  auto Start = std::chrono::steady_clock::now();
+  Expected<std::unique_ptr<raw_socket_stream>> MaybeServer =
+      ServerListener.accept(Timeout);
+  auto End = std::chrono::steady_clock::now();
+  auto Duration = std::chrono::duration_cast<std::chrono::seconds>(End - Start);
+  ASSERT_NEAR(Duration.count(), Timeout.count(), 1);
+
+  ASSERT_THAT_EXPECTED(MaybeServer, Failed());
+  llvm::Error Err = MaybeServer.takeError();
+  llvm::handleAllErrors(std::move(Err), [&](const llvm::StringError &SE) {
+    std::error_code EC = SE.convertToErrorCode();
+    ASSERT_EQ(EC, std::errc::timed_out);
+  });
+}
+} // namespace

>From 8546986f41271de2d422d7ac2215861065facf98 Mon Sep 17 00:00:00 2001
From: cpsughrue <cpsughrue at gmail.com>
Date: Wed, 6 Mar 2024 12:46:03 -0500
Subject: [PATCH 3/7] Use poll() instead of select() to manage timeout

---
 llvm/include/llvm/Support/raw_socket_stream.h |  4 +-
 llvm/lib/Support/raw_socket_stream.cpp        | 38 +++++++++----------
 2 files changed, 21 insertions(+), 21 deletions(-)

diff --git a/llvm/include/llvm/Support/raw_socket_stream.h b/llvm/include/llvm/Support/raw_socket_stream.h
index 01f02bb8d6397d..570f2f73436c68 100644
--- a/llvm/include/llvm/Support/raw_socket_stream.h
+++ b/llvm/include/llvm/Support/raw_socket_stream.h
@@ -81,10 +81,10 @@ class ListeningSocket {
   /// specified amount of time has passed. By default the method will block
   /// until the socket has recieved a connection
   ///
-  /// \param Timeout An optional timeout duration in microseconds
+  /// \param Timeout An optional timeout duration in milliseconds
   ///
   Expected<std::unique_ptr<raw_socket_stream>>
-  accept(std::optional<std::chrono::microseconds> Timeout = std::nullopt);
+  accept(std::optional<std::chrono::milliseconds> Timeout = std::nullopt);
 
   /// Creates a listening socket bound to the specified file system path.
   /// Handles the socket creation, binding, and immediately starts listening for
diff --git a/llvm/lib/Support/raw_socket_stream.cpp b/llvm/lib/Support/raw_socket_stream.cpp
index fafc880a7105b7..a8001ade3205a4 100644
--- a/llvm/lib/Support/raw_socket_stream.cpp
+++ b/llvm/lib/Support/raw_socket_stream.cpp
@@ -17,6 +17,7 @@
 #include "llvm/Support/FileSystem.h"
 
 #include <atomic>
+#include <poll.h>
 
 #ifndef _WIN32
 #include <sys/socket.h>
@@ -165,45 +166,44 @@ ListeningSocket::createListeningUnixSocket(StringRef SocketPath,
 }
 
 Expected<std::unique_ptr<raw_socket_stream>>
-ListeningSocket::accept(std::optional<std::chrono::microseconds> Timeout) {
+ListeningSocket::accept(std::optional<std::chrono::milliseconds> Timeout) {
 
-  fd_set Readfds;
-  FD_ZERO(&Readfds);
+  struct pollfd FDs[1];
+  FDs[0].events = POLLIN;
 #ifdef _WIN32
   SOCKET WinServerSock = _get_osfhandle(FD);
-  FD_SET(WinServerSock, &Readfds);
+  FDs[0].fd = WinServerSock;
 #else
-  FD_SET(FD, &Readfds);
+  FDs[0].fd = FD;
 #endif
 
-  int SelectStatus;
-  if (Timeout.has_value()) {
-    timeval TV = {0, Timeout.value().count()};
-    SelectStatus = ::select(FD + 1, &Readfds, NULL, NULL, &TV);
-  } else {
-    SelectStatus = ::select(FD + 1, &Readfds, NULL, NULL, NULL);
-  }
+  int TimeoutCount = Timeout.value_or(std::chrono::milliseconds(-1)).count();
+  int PollStatus = ::poll(FDs, 1, TimeoutCount);
 
-  if (SelectStatus == -1)
+  if (PollStatus == -1)
     return llvm::make_error<StringError>(getLastSocketErrorCode(),
-                                         "select failed");
+                                         "poll failed");
+  if (PollStatus == 0)
+    return llvm::make_error<StringError>(
+        std::make_error_code(std::errc::timed_out),
+        "No client requests within timeout window");
+
+  if (FDs[0].revents & POLLNVAL)
+    return llvm::make_error<StringError>(
+        std::make_error_code(std::errc::bad_file_descriptor),
+        "File descriptor closed by another thread");
 
   int AcceptFD;
-  if (SelectStatus) {
 #ifdef _WIN32
     SOCKET WinAcceptSock = ::accept(WinServerSock, NULL, NULL);
     AcceptFD = _open_osfhandle(WinAcceptSock, 0);
 #else
     AcceptFD = ::accept(FD, NULL, NULL);
 #endif
-  } else
-    return llvm::make_error<StringError>(
-        std::make_error_code(std::errc::timed_out), "accept timed out");
 
   if (AcceptFD == -1)
     return llvm::make_error<StringError>(getLastSocketErrorCode(),
                                          "accept failed");
-
   return std::make_unique<raw_socket_stream>(AcceptFD);
 }
 

>From a55d698c87948faaa2ad25812787047d5158da37 Mon Sep 17 00:00:00 2001
From: cpsughrue <cpsughrue at gmail.com>
Date: Wed, 6 Mar 2024 16:05:06 -0500
Subject: [PATCH 4/7] WIP (not protable) - Add self-pipe trick to
 ListeningServer

---
 llvm/include/llvm/Support/raw_socket_stream.h |  9 ++++-
 llvm/lib/Support/raw_socket_stream.cpp        | 19 ++++++++--
 .../Support/raw_socket_stream_test.cpp        | 36 +++++++++++++++++++
 3 files changed, 61 insertions(+), 3 deletions(-)

diff --git a/llvm/include/llvm/Support/raw_socket_stream.h b/llvm/include/llvm/Support/raw_socket_stream.h
index 570f2f73436c68..57199b5960b267 100644
--- a/llvm/include/llvm/Support/raw_socket_stream.h
+++ b/llvm/include/llvm/Support/raw_socket_stream.h
@@ -58,8 +58,15 @@ class WSABalancer {
 /// \endcode
 ///
 class ListeningSocket {
+
+  /// If ListeningSocket::shutdown is used by a signal handler to clean up
+  /// ListeningSocket resources FD may be closed while ::poll is waiting for
+  /// FD to become ready to perform I/O. When FD is closed ::poll will
+  /// continue to block so use the self-pipe trick to get ::poll to return
+  int PipeFD[2];
+  std::mutex PipeMutex;
   std::atomic<int> FD;
-  std::string SocketPath;
+  std::string SocketPath; // Never modified
   ListeningSocket(int SocketFD, StringRef SocketPath);
 
 #ifdef _WIN32
diff --git a/llvm/lib/Support/raw_socket_stream.cpp b/llvm/lib/Support/raw_socket_stream.cpp
index a8001ade3205a4..09d67657323f78 100644
--- a/llvm/lib/Support/raw_socket_stream.cpp
+++ b/llvm/lib/Support/raw_socket_stream.cpp
@@ -17,7 +17,9 @@
 #include "llvm/Support/FileSystem.h"
 
 #include <atomic>
+#include <fcntl.h>
 #include <poll.h>
+#include <thread>
 
 #ifndef _WIN32
 #include <sys/socket.h>
@@ -168,7 +170,7 @@ ListeningSocket::createListeningUnixSocket(StringRef SocketPath,
 Expected<std::unique_ptr<raw_socket_stream>>
 ListeningSocket::accept(std::optional<std::chrono::milliseconds> Timeout) {
 
-  struct pollfd FDs[1];
+  struct pollfd FDs[2];
   FDs[0].events = POLLIN;
 #ifdef _WIN32
   SOCKET WinServerSock = _get_osfhandle(FD);
@@ -177,8 +179,16 @@ ListeningSocket::accept(std::optional<std::chrono::milliseconds> Timeout) {
   FDs[0].fd = FD;
 #endif
 
+  FDs[1].events = POLLIN;
+  PipeMutex.lock();
+  if (::pipe(PipeFD) == -1)
+    return llvm::make_error<StringError>(getLastSocketErrorCode(),
+                                         "pipe failed");
+  FDs[1].fd = PipeFD[0];
+  PipeMutex.unlock();
+
   int TimeoutCount = Timeout.value_or(std::chrono::milliseconds(-1)).count();
-  int PollStatus = ::poll(FDs, 1, TimeoutCount);
+  int PollStatus = ::poll(FDs, 2, TimeoutCount);
 
   if (PollStatus == -1)
     return llvm::make_error<StringError>(getLastSocketErrorCode(),
@@ -212,6 +222,11 @@ void ListeningSocket::shutdown() {
     return;
   ::close(FD);
   ::unlink(SocketPath.c_str());
+
+  char Byte = 'A';
+  PipeMutex.lock();
+  write(PipeFD[1], &Byte, 1);
+  PipeMutex.unlock();
   FD = -1;
 }
 
diff --git a/llvm/unittests/Support/raw_socket_stream_test.cpp b/llvm/unittests/Support/raw_socket_stream_test.cpp
index 4ef9204bfa8a9a..179b9d89f03c1f 100644
--- a/llvm/unittests/Support/raw_socket_stream_test.cpp
+++ b/llvm/unittests/Support/raw_socket_stream_test.cpp
@@ -97,4 +97,40 @@ TEST(raw_socket_streamTest, TIMEOUT_PROVIDED) {
     ASSERT_EQ(EC, std::errc::timed_out);
   });
 }
+
+TEST(raw_socket_streamTest, FILE_DESCRIPTOR_CLOSED) {
+  if (!hasUnixSocketSupport())
+    GTEST_SKIP();
+
+  SmallString<100> SocketPath;
+  llvm::sys::fs::createUniquePath("fd_closed.sock", SocketPath, true);
+
+  // Make sure socket file does not exist. May still be there from the last test
+  std::remove(SocketPath.c_str());
+
+  Expected<ListeningSocket> MaybeServerListener =
+      ListeningSocket::createListeningUnixSocket(SocketPath);
+  ASSERT_THAT_EXPECTED(MaybeServerListener, llvm::Succeeded());
+  ListeningSocket ServerListener = std::move(*MaybeServerListener);
+
+  // Create a separate thread to close the socket after a delay. Simulates a
+  // signal handler calling ServerListener::shutdown
+  std::thread CloseThread([&]() {
+    std::this_thread::sleep_for(std::chrono::seconds(2));
+    ServerListener.shutdown();
+  });
+
+  Expected<std::unique_ptr<raw_socket_stream>> MaybeServer =
+      ServerListener.accept();
+
+  // Wait for the CloseThread to finish
+  CloseThread.join();
+
+  ASSERT_THAT_EXPECTED(MaybeServer, Failed());
+  llvm::Error Err = MaybeServer.takeError();
+  llvm::handleAllErrors(std::move(Err), [&](const llvm::StringError &SE) {
+    std::error_code EC = SE.convertToErrorCode();
+    ASSERT_EQ(EC, std::errc::bad_file_descriptor);
+  });
+}
 } // namespace

>From 0520fd9e7b1e90c7108839e408900acaf44f1ae8 Mon Sep 17 00:00:00 2001
From: cpsughrue <cpsughrue at gmail.com>
Date: Wed, 6 Mar 2024 20:42:52 -0500
Subject: [PATCH 5/7] Fix some windows build issues

---
 llvm/include/llvm/Support/raw_socket_stream.h | 29 +++++---
 llvm/lib/Support/raw_socket_stream.cpp        | 73 +++++++++++--------
 2 files changed, 59 insertions(+), 43 deletions(-)

diff --git a/llvm/include/llvm/Support/raw_socket_stream.h b/llvm/include/llvm/Support/raw_socket_stream.h
index 57199b5960b267..b3ab9f70bbe606 100644
--- a/llvm/include/llvm/Support/raw_socket_stream.h
+++ b/llvm/include/llvm/Support/raw_socket_stream.h
@@ -59,15 +59,16 @@ class WSABalancer {
 ///
 class ListeningSocket {
 
-  /// If ListeningSocket::shutdown is used by a signal handler to clean up
-  /// ListeningSocket resources FD may be closed while ::poll is waiting for
-  /// FD to become ready to perform I/O. When FD is closed ::poll will
-  /// continue to block so use the self-pipe trick to get ::poll to return
-  int PipeFD[2];
-  std::mutex PipeMutex;
   std::atomic<int> FD;
-  std::string SocketPath; // Never modified
-  ListeningSocket(int SocketFD, StringRef SocketPath);
+  std::string SocketPath; // Never modified after construction
+
+  /// If a seperate thread calls ListeningSocket::shutdown, the ListeningSocket
+  /// file descriptor (FD) could be closed while ::poll is waiting for it to be
+  /// ready to performa I/O operations. ::poll with continue to block even after
+  /// FD is closed so use a self-pipe mechanism to get ::poll to return
+  int PipeFD[2]; // Never modified after construction
+
+  ListeningSocket(int SocketFD, StringRef SocketPath, int PipeFD[2]);
 
 #ifdef _WIN32
   WSABalancer _;
@@ -79,14 +80,20 @@ class ListeningSocket {
   ListeningSocket(const ListeningSocket &LS) = delete;
   ListeningSocket &operator=(const ListeningSocket &) = delete;
 
-  /// Closes the socket's FD and unlinks the socket file from the file system.
-  /// The method is thread and signal safe
+  /// Closes the FD, unlinks the socket file, and writes to PipeFD.
+  ///
+  /// After the construction of the ListeningSocket, shutdown is signal safe if
+  /// it is called during the lifetime of the object. shutdown can be called
+  /// concurrently with ListeningSocket::accept as writing to PipeFD will cause
+  /// a blocking call to ::poll to return.
+  ///
+  /// Once shutdown is called there is no way to reinitialize ListeningSocket.
   void shutdown();
 
   /// Accepts an incoming connection on the listening socket. This method can
   /// optionally either block until a connection is available or timeout after a
   /// specified amount of time has passed. By default the method will block
-  /// until the socket has recieved a connection
+  /// until the socket has recieved a connection.
   ///
   /// \param Timeout An optional timeout duration in milliseconds
   ///
diff --git a/llvm/lib/Support/raw_socket_stream.cpp b/llvm/lib/Support/raw_socket_stream.cpp
index 09d67657323f78..76c4c4bd5a5336 100644
--- a/llvm/lib/Support/raw_socket_stream.cpp
+++ b/llvm/lib/Support/raw_socket_stream.cpp
@@ -71,36 +71,40 @@ static sockaddr_un setSocketAddr(StringRef SocketPath) {
 
 static Expected<int> getSocketFD(StringRef SocketPath) {
 #ifdef _WIN32
-  SOCKET MaybeSocket = socket(AF_UNIX, SOCK_STREAM, 0);
-  if (MaybeSocket == INVALID_SOCKET) {
+  SOCKET Socket = socket(AF_UNIX, SOCK_STREAM, 0);
+  if (Socket == INVALID_SOCKET) {
 #else
-  int MaybeSocket = socket(AF_UNIX, SOCK_STREAM, 0);
-  if (MaybeSocket == -1) {
+  int Socket = socket(AF_UNIX, SOCK_STREAM, 0);
+  if (Socket == -1) {
 #endif // _WIN32
     return llvm::make_error<StringError>(getLastSocketErrorCode(),
                                          "Create socket failed");
   }
 
   struct sockaddr_un Addr = setSocketAddr(SocketPath);
-  if (::connect(MaybeSocket, (struct sockaddr *)&Addr, sizeof(Addr)) == -1)
+  if (::connect(Socket, (struct sockaddr *)&Addr, sizeof(Addr)) == -1)
     return llvm::make_error<StringError>(getLastSocketErrorCode(),
                                          "Connect socket failed");
 
 #ifdef _WIN32
-  return _open_osfhandle(MaybeWinsocket, 0);
+  return _open_osfhandle(Socket, 0);
 #else
-  return MaybeSocket;
+  return Socket;
 #endif // _WIN32
 }
 
-ListeningSocket::ListeningSocket(int SocketFD, StringRef SocketPath)
-    : FD(SocketFD), SocketPath(SocketPath) {}
+ListeningSocket::ListeningSocket(int SocketFD, StringRef SocketPath,
+                                 int PipeFD[2])
+    : FD(SocketFD), SocketPath(SocketPath), PipeFD{PipeFD[0], PipeFD[1]} {}
 
 ListeningSocket::ListeningSocket(ListeningSocket &&LS)
-    : FD(LS.FD.load()), SocketPath(LS.SocketPath) {
+    : FD(LS.FD.load()),
+      SocketPath(LS.SocketPath), PipeFD{LS.PipeFD[0], LS.PipeFD[1]} {
 
-  LS.SocketPath.clear();
   LS.FD = -1;
+  LS.SocketPath.clear();
+  LS.PipeFD[0] = -1;
+  LS.PipeFD[1] = -1;
 }
 
 Expected<ListeningSocket>
@@ -135,36 +139,38 @@ ListeningSocket::createListeningUnixSocket(StringRef SocketPath,
 
 #ifdef _WIN32
   WSABalancer _;
-  SOCKET MaybeSocket = socket(AF_UNIX, SOCK_STREAM, 0);
-  if (MaybeSocket == INVALID_SOCKET) {
+  SOCKET Socket = socket(AF_UNIX, SOCK_STREAM, 0);
+  if (Socket == INVALID_SOCKET) {
 #else
-  int MaybeSocket = socket(AF_UNIX, SOCK_STREAM, 0);
-  if (MaybeSocket == -1) {
+  int Socket = socket(AF_UNIX, SOCK_STREAM, 0);
+  if (Socket == -1)
 #endif
     return llvm::make_error<StringError>(getLastSocketErrorCode(),
                                          "socket create failed");
-  }
 
   struct sockaddr_un Addr = setSocketAddr(SocketPath);
-  if (::bind(MaybeSocket, (struct sockaddr *)&Addr, sizeof(Addr)) == -1) {
+  if (::bind(Socket, (struct sockaddr *)&Addr, sizeof(Addr)) == -1) {
     // Grab error code from call to ::bind before calling ::close
     std::error_code EC = getLastSocketErrorCode();
-    ::close(MaybeSocket);
+    ::close(Socket);
     return llvm::make_error<StringError>(EC, "Bind error");
   }
 
   // Mark socket as passive so incoming connections can be accepted
-  if (::listen(MaybeSocket, MaxBacklog) == -1)
+  if (::listen(Socket, MaxBacklog) == -1)
     return llvm::make_error<StringError>(getLastSocketErrorCode(),
                                          "Listen error");
 
-  int Socket;
+  int PipeFD[2];
+  if (::pipe(PipeFD) == -1)
+    return llvm::make_error<StringError>(getLastSocketErrorCode(),
+                                         "pipe failed");
+
 #ifdef _WIN32
-  Socket = _open_osfhandle(MaybeWinsocket, 0);
+  return ListeningSocket{_open_osfhandle(Socket, 0), SocketPath, PipeFD};
 #else
-  Socket = MaybeSocket;
+  return ListeningSocket{Socket, SocketPath, PipeFD};
 #endif // _WIN32
-  return ListeningSocket{Socket, SocketPath};
 }
 
 Expected<std::unique_ptr<raw_socket_stream>>
@@ -180,12 +186,7 @@ ListeningSocket::accept(std::optional<std::chrono::milliseconds> Timeout) {
 #endif
 
   FDs[1].events = POLLIN;
-  PipeMutex.lock();
-  if (::pipe(PipeFD) == -1)
-    return llvm::make_error<StringError>(getLastSocketErrorCode(),
-                                         "pipe failed");
   FDs[1].fd = PipeFD[0];
-  PipeMutex.unlock();
 
   int TimeoutCount = Timeout.value_or(std::chrono::milliseconds(-1)).count();
   int PollStatus = ::poll(FDs, 2, TimeoutCount);
@@ -223,14 +224,22 @@ void ListeningSocket::shutdown() {
   ::close(FD);
   ::unlink(SocketPath.c_str());
 
+  // Ensure ::poll returns if shutdown is called by a seperate thread
   char Byte = 'A';
-  PipeMutex.lock();
-  write(PipeFD[1], &Byte, 1);
-  PipeMutex.unlock();
+  ::write(PipeFD[1], &Byte, 1);
+
   FD = -1;
 }
 
-ListeningSocket::~ListeningSocket() { shutdown(); }
+ListeningSocket::~ListeningSocket() {
+  shutdown();
+
+  // Close the pipe's FDs in the destructor instead of within
+  // ListeningSocket::shutdown to avoid unnecessary synchronization issues that
+  // would occur as PipeFD's values would have to be changed to -1
+  ::close(PipeFD[0]);
+  ::close(PipeFD[1]);
+}
 
 //===----------------------------------------------------------------------===//
 //  raw_socket_stream

>From e1bf4d12f6da1b5718fe7016aa10a62f372c0ff7 Mon Sep 17 00:00:00 2001
From: cpsughrue <cpsughrue at gmail.com>
Date: Sun, 10 Mar 2024 20:33:41 -0400
Subject: [PATCH 6/7] Add thread headerfile to unit test

---
 llvm/unittests/Support/raw_socket_stream_test.cpp | 1 +
 1 file changed, 1 insertion(+)

diff --git a/llvm/unittests/Support/raw_socket_stream_test.cpp b/llvm/unittests/Support/raw_socket_stream_test.cpp
index 179b9d89f03c1f..9989ac8ca174f2 100644
--- a/llvm/unittests/Support/raw_socket_stream_test.cpp
+++ b/llvm/unittests/Support/raw_socket_stream_test.cpp
@@ -9,6 +9,7 @@
 #include <future>
 #include <iostream>
 #include <stdlib.h>
+#include <thread>
 
 #ifdef _WIN32
 #include "llvm/Support/Windows/WindowsSupport.h"

>From 002ef00c8eddd903d76230ef65e6ef9d703992ba Mon Sep 17 00:00:00 2001
From: cpsughrue <cpsughrue at gmail.com>
Date: Sun, 10 Mar 2024 23:39:01 -0400
Subject: [PATCH 7/7] WIP - fix windows build

---
 llvm/lib/Support/raw_socket_stream.cpp | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/llvm/lib/Support/raw_socket_stream.cpp b/llvm/lib/Support/raw_socket_stream.cpp
index 76c4c4bd5a5336..0e9731e6227f5a 100644
--- a/llvm/lib/Support/raw_socket_stream.cpp
+++ b/llvm/lib/Support/raw_socket_stream.cpp
@@ -18,12 +18,12 @@
 
 #include <atomic>
 #include <fcntl.h>
-#include <poll.h>
 #include <thread>
 
 #ifndef _WIN32
 #include <sys/socket.h>
 #include <sys/un.h>
+#include <poll.h>
 #else
 #include "llvm/Support/Windows/WindowsSupport.h"
 // winsock2.h must be included before afunix.h. Briefly turn off clang-format to
@@ -162,7 +162,12 @@ ListeningSocket::createListeningUnixSocket(StringRef SocketPath,
                                          "Listen error");
 
   int PipeFD[2];
+#ifdef _WIN32
+  // Reserve 1 byte for the pipe and use default textmode
+  if (::_pipe(PipeFD, 1, 0) == -1)
+#else
   if (::pipe(PipeFD) == -1)
+#endif // _WIN32
     return llvm::make_error<StringError>(getLastSocketErrorCode(),
                                          "pipe failed");
 



More information about the llvm-commits mailing list