[llvm] Raw socket stream (PR #84710)
Connor Sughrue via llvm-commits
llvm-commits at lists.llvm.org
Sun Mar 10 21:24:31 PDT 2024
https://github.com/cpsughrue updated https://github.com/llvm/llvm-project/pull/84710
>From 4569e17123371fa348bbb7f314a1fc898623d249 Mon Sep 17 00:00:00 2001
From: cpsughrue <cpsughrue at gmail.com>
Date: Wed, 14 Feb 2024 04:08:33 -0500
Subject: [PATCH 1/7] Improvements to raw_socket_stream functionality and
documentation
---
llvm/include/llvm/Support/raw_socket_stream.h | 72 +++++-
llvm/lib/Support/raw_socket_stream.cpp | 210 ++++++++++++------
.../Support/raw_socket_stream_test.cpp | 7 +-
3 files changed, 218 insertions(+), 71 deletions(-)
diff --git a/llvm/include/llvm/Support/raw_socket_stream.h b/llvm/include/llvm/Support/raw_socket_stream.h
index c219792d82465d..e41aed755409da 100644
--- a/llvm/include/llvm/Support/raw_socket_stream.h
+++ b/llvm/include/llvm/Support/raw_socket_stream.h
@@ -17,12 +17,18 @@
#include "llvm/Support/Threading.h"
#include "llvm/Support/raw_ostream.h"
+#include <atomic>
+#include <chrono>
+
namespace llvm {
class raw_socket_stream;
-// Make sure that calls to WSAStartup and WSACleanup are balanced.
#ifdef _WIN32
+/// @brief Ensures proper initialization and cleanup of winsock resources
+///
+/// @details
+/// Make sure that calls to WSAStartup and WSACleanup are balanced.
class WSABalancer {
public:
WSABalancer();
@@ -30,22 +36,73 @@ class WSABalancer {
};
#endif // _WIN32
+/// @class ListeningSocket
+/// @brief Manages a passive (i.e., listening) UNIX domain socket
+///
+/// The ListeningSocket class encapsulates a UNIX domain socket that can listen
+/// and accept incoming connections. ListeningSocket is portable and supports
+/// Windows builds begining with Insider Build 17063. ListeningSocket is
+/// designed for server-side operations, working alongside raw_socket_streams
+/// that function as client connections.
+///
+/// Usage example:
+/// @code{.cpp}
+/// std::string Path = "/path/to/socket"
+/// Expected<ListeningSocket> S = ListeningSocket::createListeningSocket(Path);
+///
+/// if (listeningSocket) {
+/// auto connection = S->accept();
+/// if (connection) {
+/// // Use the accepted raw_socket_stream for communication.
+/// }
+/// }
+/// @endcode
+///
class ListeningSocket {
- int FD;
+ std::atomic<int> FD;
std::string SocketPath;
ListeningSocket(int SocketFD, StringRef SocketPath);
+
#ifdef _WIN32
WSABalancer _;
#endif // _WIN32
public:
- static Expected<ListeningSocket> createUnix(
+ ~ListeningSocket();
+ ListeningSocket(ListeningSocket &&LS);
+ ListeningSocket(const ListeningSocket &LS) = delete;
+ ListeningSocket &operator=(const ListeningSocket &) = delete;
+
+ /// Closes the socket's FD and unlinks the socket file from the file system.
+ /// The method is idempotent
+ void shutdown();
+
+ /// Accepts an incoming connection on the listening socket. This method can
+ /// optionally either block until a connection is available or timeout after a
+ /// specified amount of time has passed. By default the method will block
+ /// until the socket has recieved a connection
+ ///
+ /// @param Timeout An optional timeout duration in microseconds
+ ///
+ Expected<std::unique_ptr<raw_socket_stream>>
+ accept(std::optional<std::chrono::microseconds> Timeout = std::nullopt);
+
+ /// Creates a listening socket bound to the specified file system path.
+ /// Handles the socket creation, binding, and immediately starts listening for
+ /// incoming connections.
+ ///
+ /// @param SocketPath The file system path where the socket will be created
+ /// @param MaxBacklog The max number of connections in a socket's backlog
+ ///
+ static Expected<ListeningSocket> createListeningSocket(
StringRef SocketPath,
int MaxBacklog = llvm::hardware_concurrency().compute_thread_count());
- Expected<std::unique_ptr<raw_socket_stream>> accept();
- ListeningSocket(ListeningSocket &&LS);
- ~ListeningSocket();
};
+
+//===----------------------------------------------------------------------===//
+// raw_socket_stream
+//===----------------------------------------------------------------------===//
+
class raw_socket_stream : public raw_fd_stream {
uint64_t current_pos() const override { return 0; }
#ifdef _WIN32
@@ -53,11 +110,12 @@ class raw_socket_stream : public raw_fd_stream {
#endif // _WIN32
public:
+ // TODO: Should probably be private
raw_socket_stream(int SocketFD);
/// Create a \p raw_socket_stream connected to the Unix domain socket at \p
/// SocketPath.
static Expected<std::unique_ptr<raw_socket_stream>>
- createConnectedUnix(StringRef SocketPath);
+ createConnectedSocket(StringRef SocketPath);
~raw_socket_stream();
};
diff --git a/llvm/lib/Support/raw_socket_stream.cpp b/llvm/lib/Support/raw_socket_stream.cpp
index afb0ed11b2c24e..46385c7ea29b87 100644
--- a/llvm/lib/Support/raw_socket_stream.cpp
+++ b/llvm/lib/Support/raw_socket_stream.cpp
@@ -14,6 +14,9 @@
#include "llvm/Support/raw_socket_stream.h"
#include "llvm/Config/config.h"
#include "llvm/Support/Error.h"
+#include "llvm/Support/FileSystem.h"
+
+#include <atomic>
#ifndef _WIN32
#include <sys/socket.h>
@@ -45,7 +48,6 @@ WSABalancer::WSABalancer() {
}
WSABalancer::~WSABalancer() { WSACleanup(); }
-
#endif // _WIN32
static std::error_code getLastSocketErrorCode() {
@@ -56,117 +58,201 @@ static std::error_code getLastSocketErrorCode() {
#endif
}
+static void closeFD(int FD) {
+#ifdef _WIN32
+ // on windows ::close is a deprecated alias for _close
+ _close(FD);
+#else
+ ::close(FD);
+#endif
+}
+
+static void unlinkFile(StringRef Path) {
+#ifdef _WIN32
+ // on windows ::unlink is a deprecated alias for _unlink
+ _unlink(Path.str().c_str());
+#else
+ ::unlink(Path.str().c_str());
+#endif
+}
+
+static sockaddr_un setSocketAddr(StringRef SocketPath) {
+ struct sockaddr_un Addr;
+ memset(&Addr, 0, sizeof(Addr));
+ Addr.sun_family = AF_UNIX;
+ strncpy(Addr.sun_path, SocketPath.str().c_str(), sizeof(Addr.sun_path) - 1);
+ return Addr;
+}
+
+static Expected<int> getSocketFD(StringRef SocketPath) {
+#ifdef _WIN32
+ SOCKET MaybeSocket = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (MaybeSocket == INVALID_SOCKET) {
+#else
+ int MaybeSocket = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (MaybeSocket == -1) {
+#endif // _WIN32
+ return llvm::make_error<StringError>(getLastSocketErrorCode(),
+ "Create socket failed");
+ }
+
+ struct sockaddr_un Addr = setSocketAddr(SocketPath);
+ if (::connect(MaybeSocket, (struct sockaddr *)&Addr, sizeof(Addr)) == -1)
+ return llvm::make_error<StringError>(getLastSocketErrorCode(),
+ "Connect socket failed");
+
+#ifdef _WIN32
+ return _open_osfhandle(MaybeWinsocket, 0);
+#else
+ return MaybeSocket;
+#endif // _WIN32
+}
+
ListeningSocket::ListeningSocket(int SocketFD, StringRef SocketPath)
: FD(SocketFD), SocketPath(SocketPath) {}
ListeningSocket::ListeningSocket(ListeningSocket &&LS)
- : FD(LS.FD), SocketPath(LS.SocketPath) {
+ : FD(LS.FD.load()), SocketPath(LS.SocketPath) {
+
+ LS.SocketPath.clear();
LS.FD = -1;
}
-Expected<ListeningSocket> ListeningSocket::createUnix(StringRef SocketPath,
- int MaxBacklog) {
+Expected<ListeningSocket>
+ListeningSocket::createListeningSocket(StringRef SocketPath, int MaxBacklog) {
+
+ // Handle instances where the target socket address already exists and
+ // differentiate between a preexisting file with and without a bound socket
+ //
+ // ::bind will return std::errc:address_in_use if a file at the socket address
+ // already exists (e.g., the file was not properly unlinked due to a crash)
+ // even if another socket has not yet binded to that address
+ if (llvm::sys::fs::exists(SocketPath)) {
+ Expected<int> MaybeFD = getSocketFD(SocketPath);
+ if (!MaybeFD) {
+
+ // Regardless of the error, notify the caller that a file already exists
+ // at the desired socket address. The file must be removed before ::bind
+ // can use the socket address
+ consumeError(MaybeFD.takeError());
+ return llvm::make_error<StringError>(
+ std::make_error_code(std::errc::file_exists),
+ "Socket address unavailable");
+ }
+ closeFD(std::move(*MaybeFD));
+
+ // Notify caller that the provided socket address already has a bound socket
+ return llvm::make_error<StringError>(
+ std::make_error_code(std::errc::address_in_use),
+ "Socket address unavailable");
+ }
#ifdef _WIN32
WSABalancer _;
- SOCKET MaybeWinsocket = socket(AF_UNIX, SOCK_STREAM, 0);
- if (MaybeWinsocket == INVALID_SOCKET) {
+ SOCKET MaybeSocket = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (MaybeSocket == INVALID_SOCKET) {
#else
- int MaybeWinsocket = socket(AF_UNIX, SOCK_STREAM, 0);
- if (MaybeWinsocket == -1) {
+ int MaybeSocket = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (MaybeSocket == -1) {
#endif
return llvm::make_error<StringError>(getLastSocketErrorCode(),
"socket create failed");
}
- struct sockaddr_un Addr;
- memset(&Addr, 0, sizeof(Addr));
- Addr.sun_family = AF_UNIX;
- strncpy(Addr.sun_path, SocketPath.str().c_str(), sizeof(Addr.sun_path) - 1);
-
- if (bind(MaybeWinsocket, (struct sockaddr *)&Addr, sizeof(Addr)) == -1) {
- std::error_code Err = getLastSocketErrorCode();
- if (Err == std::errc::address_in_use)
- ::close(MaybeWinsocket);
- return llvm::make_error<StringError>(Err, "Bind error");
+ struct sockaddr_un Addr = setSocketAddr(SocketPath);
+ if (::bind(MaybeSocket, (struct sockaddr *)&Addr, sizeof(Addr)) == -1) {
+ // Grab error code from call to ::bind before calling ::close
+ std::error_code EC = getLastSocketErrorCode();
+ ::close(MaybeSocket);
+ return llvm::make_error<StringError>(EC, "Bind error");
}
- if (listen(MaybeWinsocket, MaxBacklog) == -1) {
+
+ // Mark socket as passive so incoming connections can be accepted
+ if (::listen(MaybeSocket, MaxBacklog) == -1)
return llvm::make_error<StringError>(getLastSocketErrorCode(),
"Listen error");
- }
- int UnixSocket;
+
+ int Socket;
#ifdef _WIN32
- UnixSocket = _open_osfhandle(MaybeWinsocket, 0);
+ Socket = _open_osfhandle(MaybeWinsocket, 0);
#else
- UnixSocket = MaybeWinsocket;
+ Socket = MaybeSocket;
#endif // _WIN32
- return ListeningSocket{UnixSocket, SocketPath};
+ return ListeningSocket{Socket, SocketPath};
}
-Expected<std::unique_ptr<raw_socket_stream>> ListeningSocket::accept() {
+Expected<std::unique_ptr<raw_socket_stream>>
+ListeningSocket::accept(std::optional<std::chrono::microseconds> Timeout) {
+
+ int SelectStatus;
int AcceptFD;
+
#ifdef _WIN32
SOCKET WinServerSock = _get_osfhandle(FD);
- SOCKET WinAcceptSock = ::accept(WinServerSock, NULL, NULL);
- AcceptFD = _open_osfhandle(WinAcceptSock, 0);
+#endif
+
+ fd_set Readfds;
+ if (Timeout.has_value()) {
+ timeval TV = {0, Timeout.value().count()};
+ FD_ZERO(&Readfds);
+#ifdef _WIN32
+ FD_SET(WinServerSock, &Readfds);
+#else
+ FD_SET(FD, &Readfds);
+#endif
+ SelectStatus = ::select(FD + 1, &Readfds, NULL, NULL, &TV);
+ } else {
+ SelectStatus = 1;
+ }
+
+ if (SelectStatus == -1)
+ return llvm::make_error<StringError>(getLastSocketErrorCode(),
+ "Select failed");
+
+ if (SelectStatus) {
+#ifdef _WIN32
+ SOCKET WinAcceptSock = ::accept(WinServerSock, NULL, NULL);
+ AcceptFD = _open_osfhandle(WinAcceptSock, 0);
#else
- AcceptFD = ::accept(FD, NULL, NULL);
-#endif //_WIN32
+ AcceptFD = ::accept(FD, NULL, NULL);
+#endif
+ } else
+ return llvm::make_error<StringError>(
+ std::make_error_code(std::errc::timed_out), "Accept timed out");
+
if (AcceptFD == -1)
return llvm::make_error<StringError>(getLastSocketErrorCode(),
"Accept failed");
+
return std::make_unique<raw_socket_stream>(AcceptFD);
}
-ListeningSocket::~ListeningSocket() {
+void ListeningSocket::shutdown() {
if (FD == -1)
return;
- ::close(FD);
- unlink(SocketPath.c_str());
+ closeFD(FD);
+ unlinkFile(SocketPath);
+ FD = -1;
}
-static Expected<int> GetSocketFD(StringRef SocketPath) {
-#ifdef _WIN32
- SOCKET MaybeWinsocket = socket(AF_UNIX, SOCK_STREAM, 0);
- if (MaybeWinsocket == INVALID_SOCKET) {
-#else
- int MaybeWinsocket = socket(AF_UNIX, SOCK_STREAM, 0);
- if (MaybeWinsocket == -1) {
-#endif // _WIN32
- return llvm::make_error<StringError>(getLastSocketErrorCode(),
- "Create socket failed");
- }
+ListeningSocket::~ListeningSocket() { shutdown(); }
- struct sockaddr_un Addr;
- memset(&Addr, 0, sizeof(Addr));
- Addr.sun_family = AF_UNIX;
- strncpy(Addr.sun_path, SocketPath.str().c_str(), sizeof(Addr.sun_path) - 1);
-
- int status = connect(MaybeWinsocket, (struct sockaddr *)&Addr, sizeof(Addr));
- if (status == -1) {
- return llvm::make_error<StringError>(getLastSocketErrorCode(),
- "Connect socket failed");
- }
-#ifdef _WIN32
- return _open_osfhandle(MaybeWinsocket, 0);
-#else
- return MaybeWinsocket;
-#endif // _WIN32
-}
+//===----------------------------------------------------------------------===//
+// raw_socket_stream
+//===----------------------------------------------------------------------===//
raw_socket_stream::raw_socket_stream(int SocketFD)
: raw_fd_stream(SocketFD, true) {}
Expected<std::unique_ptr<raw_socket_stream>>
-raw_socket_stream::createConnectedUnix(StringRef SocketPath) {
+raw_socket_stream::createConnectedSocket(StringRef SocketPath) {
#ifdef _WIN32
WSABalancer _;
#endif // _WIN32
- Expected<int> FD = GetSocketFD(SocketPath);
+ Expected<int> FD = getSocketFD(SocketPath);
if (!FD)
return FD.takeError();
return std::make_unique<raw_socket_stream>(*FD);
}
raw_socket_stream::~raw_socket_stream() {}
-
diff --git a/llvm/unittests/Support/raw_socket_stream_test.cpp b/llvm/unittests/Support/raw_socket_stream_test.cpp
index 6903862e540315..e6dafe5e062d0b 100644
--- a/llvm/unittests/Support/raw_socket_stream_test.cpp
+++ b/llvm/unittests/Support/raw_socket_stream_test.cpp
@@ -35,16 +35,19 @@ TEST(raw_socket_streamTest, CLIENT_TO_SERVER_AND_SERVER_TO_CLIENT) {
llvm::sys::fs::createUniquePath("test_raw_socket_stream.sock", SocketPath,
true);
+ // Make sure socket file does not exist. May still be there from the last test
+ std::remove(SocketPath.c_str());
+
char Bytes[8];
Expected<ListeningSocket> MaybeServerListener =
- ListeningSocket::createUnix(SocketPath);
+ ListeningSocket::createListeningSocket(SocketPath);
ASSERT_THAT_EXPECTED(MaybeServerListener, llvm::Succeeded());
ListeningSocket ServerListener = std::move(*MaybeServerListener);
Expected<std::unique_ptr<raw_socket_stream>> MaybeClient =
- raw_socket_stream::createConnectedUnix(SocketPath);
+ raw_socket_stream::createConnectedSocket(SocketPath);
ASSERT_THAT_EXPECTED(MaybeClient, llvm::Succeeded());
raw_socket_stream &Client = **MaybeClient;
>From 16d3f23ec97ca7120f43fcc10fe6e2262a95cbcd Mon Sep 17 00:00:00 2001
From: cpsughrue <cpsughrue at gmail.com>
Date: Wed, 6 Mar 2024 00:56:00 -0500
Subject: [PATCH 2/7] Address feedback
- use ::close and ::unlink instead of _close and _unlink on Windows
- fix name of raw_socket_stream functions to specify Unix
- use \ instead of @ in doxygen comments
- write unittest to make sure timeout works correctly
---
llvm/include/llvm/Support/raw_socket_stream.h | 38 ++++++------
llvm/lib/Support/raw_socket_stream.cpp | 58 ++++++-------------
.../Support/raw_socket_stream_test.cpp | 43 +++++++++++---
3 files changed, 73 insertions(+), 66 deletions(-)
diff --git a/llvm/include/llvm/Support/raw_socket_stream.h b/llvm/include/llvm/Support/raw_socket_stream.h
index e41aed755409da..01f02bb8d6397d 100644
--- a/llvm/include/llvm/Support/raw_socket_stream.h
+++ b/llvm/include/llvm/Support/raw_socket_stream.h
@@ -25,9 +25,8 @@ namespace llvm {
class raw_socket_stream;
#ifdef _WIN32
-/// @brief Ensures proper initialization and cleanup of winsock resources
+/// \brief Ensures proper initialization and cleanup of winsock resources
///
-/// @details
/// Make sure that calls to WSAStartup and WSACleanup are balanced.
class WSABalancer {
public:
@@ -36,27 +35,27 @@ class WSABalancer {
};
#endif // _WIN32
-/// @class ListeningSocket
-/// @brief Manages a passive (i.e., listening) UNIX domain socket
+/// \class ListeningSocket
+/// \brief Manages a passive (i.e., listening) UNIX domain socket
///
/// The ListeningSocket class encapsulates a UNIX domain socket that can listen
/// and accept incoming connections. ListeningSocket is portable and supports
/// Windows builds begining with Insider Build 17063. ListeningSocket is
-/// designed for server-side operations, working alongside raw_socket_streams
+/// designed for server-side operations, working alongside \p raw_socket_streams
/// that function as client connections.
///
/// Usage example:
-/// @code{.cpp}
+/// \code{.cpp}
/// std::string Path = "/path/to/socket"
/// Expected<ListeningSocket> S = ListeningSocket::createListeningSocket(Path);
///
-/// if (listeningSocket) {
-/// auto connection = S->accept();
-/// if (connection) {
-/// // Use the accepted raw_socket_stream for communication.
-/// }
+/// if (S) {
+/// Expected<std::unique_ptr<raw_socket_stream>> connection = S->accept();
+/// if (connection) {
+/// // Use the accepted raw_socket_stream for communication.
+/// }
/// }
-/// @endcode
+/// \endcode
///
class ListeningSocket {
std::atomic<int> FD;
@@ -74,7 +73,7 @@ class ListeningSocket {
ListeningSocket &operator=(const ListeningSocket &) = delete;
/// Closes the socket's FD and unlinks the socket file from the file system.
- /// The method is idempotent
+ /// The method is thread and signal safe
void shutdown();
/// Accepts an incoming connection on the listening socket. This method can
@@ -82,7 +81,7 @@ class ListeningSocket {
/// specified amount of time has passed. By default the method will block
/// until the socket has recieved a connection
///
- /// @param Timeout An optional timeout duration in microseconds
+ /// \param Timeout An optional timeout duration in microseconds
///
Expected<std::unique_ptr<raw_socket_stream>>
accept(std::optional<std::chrono::microseconds> Timeout = std::nullopt);
@@ -91,10 +90,10 @@ class ListeningSocket {
/// Handles the socket creation, binding, and immediately starts listening for
/// incoming connections.
///
- /// @param SocketPath The file system path where the socket will be created
- /// @param MaxBacklog The max number of connections in a socket's backlog
+ /// \param SocketPath The file system path where the socket will be created
+ /// \param MaxBacklog The max number of connections in a socket's backlog
///
- static Expected<ListeningSocket> createListeningSocket(
+ static Expected<ListeningSocket> createListeningUnixSocket(
StringRef SocketPath,
int MaxBacklog = llvm::hardware_concurrency().compute_thread_count());
};
@@ -110,12 +109,11 @@ class raw_socket_stream : public raw_fd_stream {
#endif // _WIN32
public:
- // TODO: Should probably be private
raw_socket_stream(int SocketFD);
- /// Create a \p raw_socket_stream connected to the Unix domain socket at \p
+ /// Create a \p raw_socket_stream connected to the UNIX domain socket at \p
/// SocketPath.
static Expected<std::unique_ptr<raw_socket_stream>>
- createConnectedSocket(StringRef SocketPath);
+ createConnectedUnixSocket(StringRef SocketPath);
~raw_socket_stream();
};
diff --git a/llvm/lib/Support/raw_socket_stream.cpp b/llvm/lib/Support/raw_socket_stream.cpp
index 46385c7ea29b87..fafc880a7105b7 100644
--- a/llvm/lib/Support/raw_socket_stream.cpp
+++ b/llvm/lib/Support/raw_socket_stream.cpp
@@ -58,24 +58,6 @@ static std::error_code getLastSocketErrorCode() {
#endif
}
-static void closeFD(int FD) {
-#ifdef _WIN32
- // on windows ::close is a deprecated alias for _close
- _close(FD);
-#else
- ::close(FD);
-#endif
-}
-
-static void unlinkFile(StringRef Path) {
-#ifdef _WIN32
- // on windows ::unlink is a deprecated alias for _unlink
- _unlink(Path.str().c_str());
-#else
- ::unlink(Path.str().c_str());
-#endif
-}
-
static sockaddr_un setSocketAddr(StringRef SocketPath) {
struct sockaddr_un Addr;
memset(&Addr, 0, sizeof(Addr));
@@ -119,7 +101,8 @@ ListeningSocket::ListeningSocket(ListeningSocket &&LS)
}
Expected<ListeningSocket>
-ListeningSocket::createListeningSocket(StringRef SocketPath, int MaxBacklog) {
+ListeningSocket::createListeningUnixSocket(StringRef SocketPath,
+ int MaxBacklog) {
// Handle instances where the target socket address already exists and
// differentiate between a preexisting file with and without a bound socket
@@ -132,14 +115,14 @@ ListeningSocket::createListeningSocket(StringRef SocketPath, int MaxBacklog) {
if (!MaybeFD) {
// Regardless of the error, notify the caller that a file already exists
- // at the desired socket address. The file must be removed before ::bind
- // can use the socket address
+ // at the desired socket address and that there is no bound socket at that
+ // address. The file must be removed before ::bind can use the address
consumeError(MaybeFD.takeError());
return llvm::make_error<StringError>(
std::make_error_code(std::errc::file_exists),
"Socket address unavailable");
}
- closeFD(std::move(*MaybeFD));
+ ::close(std::move(*MaybeFD));
// Notify caller that the provided socket address already has a bound socket
return llvm::make_error<StringError>(
@@ -184,31 +167,28 @@ ListeningSocket::createListeningSocket(StringRef SocketPath, int MaxBacklog) {
Expected<std::unique_ptr<raw_socket_stream>>
ListeningSocket::accept(std::optional<std::chrono::microseconds> Timeout) {
- int SelectStatus;
- int AcceptFD;
-
+ fd_set Readfds;
+ FD_ZERO(&Readfds);
#ifdef _WIN32
SOCKET WinServerSock = _get_osfhandle(FD);
+ FD_SET(WinServerSock, &Readfds);
+#else
+ FD_SET(FD, &Readfds);
#endif
- fd_set Readfds;
+ int SelectStatus;
if (Timeout.has_value()) {
timeval TV = {0, Timeout.value().count()};
- FD_ZERO(&Readfds);
-#ifdef _WIN32
- FD_SET(WinServerSock, &Readfds);
-#else
- FD_SET(FD, &Readfds);
-#endif
SelectStatus = ::select(FD + 1, &Readfds, NULL, NULL, &TV);
} else {
- SelectStatus = 1;
+ SelectStatus = ::select(FD + 1, &Readfds, NULL, NULL, NULL);
}
if (SelectStatus == -1)
return llvm::make_error<StringError>(getLastSocketErrorCode(),
- "Select failed");
+ "select failed");
+ int AcceptFD;
if (SelectStatus) {
#ifdef _WIN32
SOCKET WinAcceptSock = ::accept(WinServerSock, NULL, NULL);
@@ -218,11 +198,11 @@ ListeningSocket::accept(std::optional<std::chrono::microseconds> Timeout) {
#endif
} else
return llvm::make_error<StringError>(
- std::make_error_code(std::errc::timed_out), "Accept timed out");
+ std::make_error_code(std::errc::timed_out), "accept timed out");
if (AcceptFD == -1)
return llvm::make_error<StringError>(getLastSocketErrorCode(),
- "Accept failed");
+ "accept failed");
return std::make_unique<raw_socket_stream>(AcceptFD);
}
@@ -230,8 +210,8 @@ ListeningSocket::accept(std::optional<std::chrono::microseconds> Timeout) {
void ListeningSocket::shutdown() {
if (FD == -1)
return;
- closeFD(FD);
- unlinkFile(SocketPath);
+ ::close(FD);
+ ::unlink(SocketPath.c_str());
FD = -1;
}
@@ -245,7 +225,7 @@ raw_socket_stream::raw_socket_stream(int SocketFD)
: raw_fd_stream(SocketFD, true) {}
Expected<std::unique_ptr<raw_socket_stream>>
-raw_socket_stream::createConnectedSocket(StringRef SocketPath) {
+raw_socket_stream::createConnectedUnixSocket(StringRef SocketPath) {
#ifdef _WIN32
WSABalancer _;
#endif // _WIN32
diff --git a/llvm/unittests/Support/raw_socket_stream_test.cpp b/llvm/unittests/Support/raw_socket_stream_test.cpp
index e6dafe5e062d0b..4ef9204bfa8a9a 100644
--- a/llvm/unittests/Support/raw_socket_stream_test.cpp
+++ b/llvm/unittests/Support/raw_socket_stream_test.cpp
@@ -32,22 +32,19 @@ TEST(raw_socket_streamTest, CLIENT_TO_SERVER_AND_SERVER_TO_CLIENT) {
GTEST_SKIP();
SmallString<100> SocketPath;
- llvm::sys::fs::createUniquePath("test_raw_socket_stream.sock", SocketPath,
- true);
+ llvm::sys::fs::createUniquePath("client_server_comms.sock", SocketPath, true);
// Make sure socket file does not exist. May still be there from the last test
std::remove(SocketPath.c_str());
- char Bytes[8];
-
Expected<ListeningSocket> MaybeServerListener =
- ListeningSocket::createListeningSocket(SocketPath);
+ ListeningSocket::createListeningUnixSocket(SocketPath);
ASSERT_THAT_EXPECTED(MaybeServerListener, llvm::Succeeded());
ListeningSocket ServerListener = std::move(*MaybeServerListener);
Expected<std::unique_ptr<raw_socket_stream>> MaybeClient =
- raw_socket_stream::createConnectedSocket(SocketPath);
+ raw_socket_stream::createConnectedUnixSocket(SocketPath);
ASSERT_THAT_EXPECTED(MaybeClient, llvm::Succeeded());
raw_socket_stream &Client = **MaybeClient;
@@ -61,6 +58,7 @@ TEST(raw_socket_streamTest, CLIENT_TO_SERVER_AND_SERVER_TO_CLIENT) {
Client << "01234567";
Client.flush();
+ char Bytes[8];
ssize_t BytesRead = Server.read(Bytes, 8);
std::string string(Bytes, 8);
@@ -68,4 +66,35 @@ TEST(raw_socket_streamTest, CLIENT_TO_SERVER_AND_SERVER_TO_CLIENT) {
ASSERT_EQ(8, BytesRead);
ASSERT_EQ("01234567", string);
}
-} // namespace
\ No newline at end of file
+
+TEST(raw_socket_streamTest, TIMEOUT_PROVIDED) {
+ if (!hasUnixSocketSupport())
+ GTEST_SKIP();
+
+ SmallString<100> SocketPath;
+ llvm::sys::fs::createUniquePath("timout_provided.sock", SocketPath, true);
+
+ // Make sure socket file does not exist. May still be there from the last test
+ std::remove(SocketPath.c_str());
+
+ Expected<ListeningSocket> MaybeServerListener =
+ ListeningSocket::createListeningUnixSocket(SocketPath);
+ ASSERT_THAT_EXPECTED(MaybeServerListener, llvm::Succeeded());
+ ListeningSocket ServerListener = std::move(*MaybeServerListener);
+
+ std::chrono::seconds Timeout = std::chrono::seconds(5);
+ auto Start = std::chrono::steady_clock::now();
+ Expected<std::unique_ptr<raw_socket_stream>> MaybeServer =
+ ServerListener.accept(Timeout);
+ auto End = std::chrono::steady_clock::now();
+ auto Duration = std::chrono::duration_cast<std::chrono::seconds>(End - Start);
+ ASSERT_NEAR(Duration.count(), Timeout.count(), 1);
+
+ ASSERT_THAT_EXPECTED(MaybeServer, Failed());
+ llvm::Error Err = MaybeServer.takeError();
+ llvm::handleAllErrors(std::move(Err), [&](const llvm::StringError &SE) {
+ std::error_code EC = SE.convertToErrorCode();
+ ASSERT_EQ(EC, std::errc::timed_out);
+ });
+}
+} // namespace
>From 8546986f41271de2d422d7ac2215861065facf98 Mon Sep 17 00:00:00 2001
From: cpsughrue <cpsughrue at gmail.com>
Date: Wed, 6 Mar 2024 12:46:03 -0500
Subject: [PATCH 3/7] Use poll() instead of select() to manage timeout
---
llvm/include/llvm/Support/raw_socket_stream.h | 4 +-
llvm/lib/Support/raw_socket_stream.cpp | 38 +++++++++----------
2 files changed, 21 insertions(+), 21 deletions(-)
diff --git a/llvm/include/llvm/Support/raw_socket_stream.h b/llvm/include/llvm/Support/raw_socket_stream.h
index 01f02bb8d6397d..570f2f73436c68 100644
--- a/llvm/include/llvm/Support/raw_socket_stream.h
+++ b/llvm/include/llvm/Support/raw_socket_stream.h
@@ -81,10 +81,10 @@ class ListeningSocket {
/// specified amount of time has passed. By default the method will block
/// until the socket has recieved a connection
///
- /// \param Timeout An optional timeout duration in microseconds
+ /// \param Timeout An optional timeout duration in milliseconds
///
Expected<std::unique_ptr<raw_socket_stream>>
- accept(std::optional<std::chrono::microseconds> Timeout = std::nullopt);
+ accept(std::optional<std::chrono::milliseconds> Timeout = std::nullopt);
/// Creates a listening socket bound to the specified file system path.
/// Handles the socket creation, binding, and immediately starts listening for
diff --git a/llvm/lib/Support/raw_socket_stream.cpp b/llvm/lib/Support/raw_socket_stream.cpp
index fafc880a7105b7..a8001ade3205a4 100644
--- a/llvm/lib/Support/raw_socket_stream.cpp
+++ b/llvm/lib/Support/raw_socket_stream.cpp
@@ -17,6 +17,7 @@
#include "llvm/Support/FileSystem.h"
#include <atomic>
+#include <poll.h>
#ifndef _WIN32
#include <sys/socket.h>
@@ -165,45 +166,44 @@ ListeningSocket::createListeningUnixSocket(StringRef SocketPath,
}
Expected<std::unique_ptr<raw_socket_stream>>
-ListeningSocket::accept(std::optional<std::chrono::microseconds> Timeout) {
+ListeningSocket::accept(std::optional<std::chrono::milliseconds> Timeout) {
- fd_set Readfds;
- FD_ZERO(&Readfds);
+ struct pollfd FDs[1];
+ FDs[0].events = POLLIN;
#ifdef _WIN32
SOCKET WinServerSock = _get_osfhandle(FD);
- FD_SET(WinServerSock, &Readfds);
+ FDs[0].fd = WinServerSock;
#else
- FD_SET(FD, &Readfds);
+ FDs[0].fd = FD;
#endif
- int SelectStatus;
- if (Timeout.has_value()) {
- timeval TV = {0, Timeout.value().count()};
- SelectStatus = ::select(FD + 1, &Readfds, NULL, NULL, &TV);
- } else {
- SelectStatus = ::select(FD + 1, &Readfds, NULL, NULL, NULL);
- }
+ int TimeoutCount = Timeout.value_or(std::chrono::milliseconds(-1)).count();
+ int PollStatus = ::poll(FDs, 1, TimeoutCount);
- if (SelectStatus == -1)
+ if (PollStatus == -1)
return llvm::make_error<StringError>(getLastSocketErrorCode(),
- "select failed");
+ "poll failed");
+ if (PollStatus == 0)
+ return llvm::make_error<StringError>(
+ std::make_error_code(std::errc::timed_out),
+ "No client requests within timeout window");
+
+ if (FDs[0].revents & POLLNVAL)
+ return llvm::make_error<StringError>(
+ std::make_error_code(std::errc::bad_file_descriptor),
+ "File descriptor closed by another thread");
int AcceptFD;
- if (SelectStatus) {
#ifdef _WIN32
SOCKET WinAcceptSock = ::accept(WinServerSock, NULL, NULL);
AcceptFD = _open_osfhandle(WinAcceptSock, 0);
#else
AcceptFD = ::accept(FD, NULL, NULL);
#endif
- } else
- return llvm::make_error<StringError>(
- std::make_error_code(std::errc::timed_out), "accept timed out");
if (AcceptFD == -1)
return llvm::make_error<StringError>(getLastSocketErrorCode(),
"accept failed");
-
return std::make_unique<raw_socket_stream>(AcceptFD);
}
>From a55d698c87948faaa2ad25812787047d5158da37 Mon Sep 17 00:00:00 2001
From: cpsughrue <cpsughrue at gmail.com>
Date: Wed, 6 Mar 2024 16:05:06 -0500
Subject: [PATCH 4/7] WIP (not protable) - Add self-pipe trick to
ListeningServer
---
llvm/include/llvm/Support/raw_socket_stream.h | 9 ++++-
llvm/lib/Support/raw_socket_stream.cpp | 19 ++++++++--
.../Support/raw_socket_stream_test.cpp | 36 +++++++++++++++++++
3 files changed, 61 insertions(+), 3 deletions(-)
diff --git a/llvm/include/llvm/Support/raw_socket_stream.h b/llvm/include/llvm/Support/raw_socket_stream.h
index 570f2f73436c68..57199b5960b267 100644
--- a/llvm/include/llvm/Support/raw_socket_stream.h
+++ b/llvm/include/llvm/Support/raw_socket_stream.h
@@ -58,8 +58,15 @@ class WSABalancer {
/// \endcode
///
class ListeningSocket {
+
+ /// If ListeningSocket::shutdown is used by a signal handler to clean up
+ /// ListeningSocket resources FD may be closed while ::poll is waiting for
+ /// FD to become ready to perform I/O. When FD is closed ::poll will
+ /// continue to block so use the self-pipe trick to get ::poll to return
+ int PipeFD[2];
+ std::mutex PipeMutex;
std::atomic<int> FD;
- std::string SocketPath;
+ std::string SocketPath; // Never modified
ListeningSocket(int SocketFD, StringRef SocketPath);
#ifdef _WIN32
diff --git a/llvm/lib/Support/raw_socket_stream.cpp b/llvm/lib/Support/raw_socket_stream.cpp
index a8001ade3205a4..09d67657323f78 100644
--- a/llvm/lib/Support/raw_socket_stream.cpp
+++ b/llvm/lib/Support/raw_socket_stream.cpp
@@ -17,7 +17,9 @@
#include "llvm/Support/FileSystem.h"
#include <atomic>
+#include <fcntl.h>
#include <poll.h>
+#include <thread>
#ifndef _WIN32
#include <sys/socket.h>
@@ -168,7 +170,7 @@ ListeningSocket::createListeningUnixSocket(StringRef SocketPath,
Expected<std::unique_ptr<raw_socket_stream>>
ListeningSocket::accept(std::optional<std::chrono::milliseconds> Timeout) {
- struct pollfd FDs[1];
+ struct pollfd FDs[2];
FDs[0].events = POLLIN;
#ifdef _WIN32
SOCKET WinServerSock = _get_osfhandle(FD);
@@ -177,8 +179,16 @@ ListeningSocket::accept(std::optional<std::chrono::milliseconds> Timeout) {
FDs[0].fd = FD;
#endif
+ FDs[1].events = POLLIN;
+ PipeMutex.lock();
+ if (::pipe(PipeFD) == -1)
+ return llvm::make_error<StringError>(getLastSocketErrorCode(),
+ "pipe failed");
+ FDs[1].fd = PipeFD[0];
+ PipeMutex.unlock();
+
int TimeoutCount = Timeout.value_or(std::chrono::milliseconds(-1)).count();
- int PollStatus = ::poll(FDs, 1, TimeoutCount);
+ int PollStatus = ::poll(FDs, 2, TimeoutCount);
if (PollStatus == -1)
return llvm::make_error<StringError>(getLastSocketErrorCode(),
@@ -212,6 +222,11 @@ void ListeningSocket::shutdown() {
return;
::close(FD);
::unlink(SocketPath.c_str());
+
+ char Byte = 'A';
+ PipeMutex.lock();
+ write(PipeFD[1], &Byte, 1);
+ PipeMutex.unlock();
FD = -1;
}
diff --git a/llvm/unittests/Support/raw_socket_stream_test.cpp b/llvm/unittests/Support/raw_socket_stream_test.cpp
index 4ef9204bfa8a9a..179b9d89f03c1f 100644
--- a/llvm/unittests/Support/raw_socket_stream_test.cpp
+++ b/llvm/unittests/Support/raw_socket_stream_test.cpp
@@ -97,4 +97,40 @@ TEST(raw_socket_streamTest, TIMEOUT_PROVIDED) {
ASSERT_EQ(EC, std::errc::timed_out);
});
}
+
+TEST(raw_socket_streamTest, FILE_DESCRIPTOR_CLOSED) {
+ if (!hasUnixSocketSupport())
+ GTEST_SKIP();
+
+ SmallString<100> SocketPath;
+ llvm::sys::fs::createUniquePath("fd_closed.sock", SocketPath, true);
+
+ // Make sure socket file does not exist. May still be there from the last test
+ std::remove(SocketPath.c_str());
+
+ Expected<ListeningSocket> MaybeServerListener =
+ ListeningSocket::createListeningUnixSocket(SocketPath);
+ ASSERT_THAT_EXPECTED(MaybeServerListener, llvm::Succeeded());
+ ListeningSocket ServerListener = std::move(*MaybeServerListener);
+
+ // Create a separate thread to close the socket after a delay. Simulates a
+ // signal handler calling ServerListener::shutdown
+ std::thread CloseThread([&]() {
+ std::this_thread::sleep_for(std::chrono::seconds(2));
+ ServerListener.shutdown();
+ });
+
+ Expected<std::unique_ptr<raw_socket_stream>> MaybeServer =
+ ServerListener.accept();
+
+ // Wait for the CloseThread to finish
+ CloseThread.join();
+
+ ASSERT_THAT_EXPECTED(MaybeServer, Failed());
+ llvm::Error Err = MaybeServer.takeError();
+ llvm::handleAllErrors(std::move(Err), [&](const llvm::StringError &SE) {
+ std::error_code EC = SE.convertToErrorCode();
+ ASSERT_EQ(EC, std::errc::bad_file_descriptor);
+ });
+}
} // namespace
>From 0520fd9e7b1e90c7108839e408900acaf44f1ae8 Mon Sep 17 00:00:00 2001
From: cpsughrue <cpsughrue at gmail.com>
Date: Wed, 6 Mar 2024 20:42:52 -0500
Subject: [PATCH 5/7] Fix some windows build issues
---
llvm/include/llvm/Support/raw_socket_stream.h | 29 +++++---
llvm/lib/Support/raw_socket_stream.cpp | 73 +++++++++++--------
2 files changed, 59 insertions(+), 43 deletions(-)
diff --git a/llvm/include/llvm/Support/raw_socket_stream.h b/llvm/include/llvm/Support/raw_socket_stream.h
index 57199b5960b267..b3ab9f70bbe606 100644
--- a/llvm/include/llvm/Support/raw_socket_stream.h
+++ b/llvm/include/llvm/Support/raw_socket_stream.h
@@ -59,15 +59,16 @@ class WSABalancer {
///
class ListeningSocket {
- /// If ListeningSocket::shutdown is used by a signal handler to clean up
- /// ListeningSocket resources FD may be closed while ::poll is waiting for
- /// FD to become ready to perform I/O. When FD is closed ::poll will
- /// continue to block so use the self-pipe trick to get ::poll to return
- int PipeFD[2];
- std::mutex PipeMutex;
std::atomic<int> FD;
- std::string SocketPath; // Never modified
- ListeningSocket(int SocketFD, StringRef SocketPath);
+ std::string SocketPath; // Never modified after construction
+
+ /// If a seperate thread calls ListeningSocket::shutdown, the ListeningSocket
+ /// file descriptor (FD) could be closed while ::poll is waiting for it to be
+ /// ready to performa I/O operations. ::poll with continue to block even after
+ /// FD is closed so use a self-pipe mechanism to get ::poll to return
+ int PipeFD[2]; // Never modified after construction
+
+ ListeningSocket(int SocketFD, StringRef SocketPath, int PipeFD[2]);
#ifdef _WIN32
WSABalancer _;
@@ -79,14 +80,20 @@ class ListeningSocket {
ListeningSocket(const ListeningSocket &LS) = delete;
ListeningSocket &operator=(const ListeningSocket &) = delete;
- /// Closes the socket's FD and unlinks the socket file from the file system.
- /// The method is thread and signal safe
+ /// Closes the FD, unlinks the socket file, and writes to PipeFD.
+ ///
+ /// After the construction of the ListeningSocket, shutdown is signal safe if
+ /// it is called during the lifetime of the object. shutdown can be called
+ /// concurrently with ListeningSocket::accept as writing to PipeFD will cause
+ /// a blocking call to ::poll to return.
+ ///
+ /// Once shutdown is called there is no way to reinitialize ListeningSocket.
void shutdown();
/// Accepts an incoming connection on the listening socket. This method can
/// optionally either block until a connection is available or timeout after a
/// specified amount of time has passed. By default the method will block
- /// until the socket has recieved a connection
+ /// until the socket has recieved a connection.
///
/// \param Timeout An optional timeout duration in milliseconds
///
diff --git a/llvm/lib/Support/raw_socket_stream.cpp b/llvm/lib/Support/raw_socket_stream.cpp
index 09d67657323f78..76c4c4bd5a5336 100644
--- a/llvm/lib/Support/raw_socket_stream.cpp
+++ b/llvm/lib/Support/raw_socket_stream.cpp
@@ -71,36 +71,40 @@ static sockaddr_un setSocketAddr(StringRef SocketPath) {
static Expected<int> getSocketFD(StringRef SocketPath) {
#ifdef _WIN32
- SOCKET MaybeSocket = socket(AF_UNIX, SOCK_STREAM, 0);
- if (MaybeSocket == INVALID_SOCKET) {
+ SOCKET Socket = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (Socket == INVALID_SOCKET) {
#else
- int MaybeSocket = socket(AF_UNIX, SOCK_STREAM, 0);
- if (MaybeSocket == -1) {
+ int Socket = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (Socket == -1) {
#endif // _WIN32
return llvm::make_error<StringError>(getLastSocketErrorCode(),
"Create socket failed");
}
struct sockaddr_un Addr = setSocketAddr(SocketPath);
- if (::connect(MaybeSocket, (struct sockaddr *)&Addr, sizeof(Addr)) == -1)
+ if (::connect(Socket, (struct sockaddr *)&Addr, sizeof(Addr)) == -1)
return llvm::make_error<StringError>(getLastSocketErrorCode(),
"Connect socket failed");
#ifdef _WIN32
- return _open_osfhandle(MaybeWinsocket, 0);
+ return _open_osfhandle(Socket, 0);
#else
- return MaybeSocket;
+ return Socket;
#endif // _WIN32
}
-ListeningSocket::ListeningSocket(int SocketFD, StringRef SocketPath)
- : FD(SocketFD), SocketPath(SocketPath) {}
+ListeningSocket::ListeningSocket(int SocketFD, StringRef SocketPath,
+ int PipeFD[2])
+ : FD(SocketFD), SocketPath(SocketPath), PipeFD{PipeFD[0], PipeFD[1]} {}
ListeningSocket::ListeningSocket(ListeningSocket &&LS)
- : FD(LS.FD.load()), SocketPath(LS.SocketPath) {
+ : FD(LS.FD.load()),
+ SocketPath(LS.SocketPath), PipeFD{LS.PipeFD[0], LS.PipeFD[1]} {
- LS.SocketPath.clear();
LS.FD = -1;
+ LS.SocketPath.clear();
+ LS.PipeFD[0] = -1;
+ LS.PipeFD[1] = -1;
}
Expected<ListeningSocket>
@@ -135,36 +139,38 @@ ListeningSocket::createListeningUnixSocket(StringRef SocketPath,
#ifdef _WIN32
WSABalancer _;
- SOCKET MaybeSocket = socket(AF_UNIX, SOCK_STREAM, 0);
- if (MaybeSocket == INVALID_SOCKET) {
+ SOCKET Socket = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (Socket == INVALID_SOCKET) {
#else
- int MaybeSocket = socket(AF_UNIX, SOCK_STREAM, 0);
- if (MaybeSocket == -1) {
+ int Socket = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (Socket == -1)
#endif
return llvm::make_error<StringError>(getLastSocketErrorCode(),
"socket create failed");
- }
struct sockaddr_un Addr = setSocketAddr(SocketPath);
- if (::bind(MaybeSocket, (struct sockaddr *)&Addr, sizeof(Addr)) == -1) {
+ if (::bind(Socket, (struct sockaddr *)&Addr, sizeof(Addr)) == -1) {
// Grab error code from call to ::bind before calling ::close
std::error_code EC = getLastSocketErrorCode();
- ::close(MaybeSocket);
+ ::close(Socket);
return llvm::make_error<StringError>(EC, "Bind error");
}
// Mark socket as passive so incoming connections can be accepted
- if (::listen(MaybeSocket, MaxBacklog) == -1)
+ if (::listen(Socket, MaxBacklog) == -1)
return llvm::make_error<StringError>(getLastSocketErrorCode(),
"Listen error");
- int Socket;
+ int PipeFD[2];
+ if (::pipe(PipeFD) == -1)
+ return llvm::make_error<StringError>(getLastSocketErrorCode(),
+ "pipe failed");
+
#ifdef _WIN32
- Socket = _open_osfhandle(MaybeWinsocket, 0);
+ return ListeningSocket{_open_osfhandle(Socket, 0), SocketPath, PipeFD};
#else
- Socket = MaybeSocket;
+ return ListeningSocket{Socket, SocketPath, PipeFD};
#endif // _WIN32
- return ListeningSocket{Socket, SocketPath};
}
Expected<std::unique_ptr<raw_socket_stream>>
@@ -180,12 +186,7 @@ ListeningSocket::accept(std::optional<std::chrono::milliseconds> Timeout) {
#endif
FDs[1].events = POLLIN;
- PipeMutex.lock();
- if (::pipe(PipeFD) == -1)
- return llvm::make_error<StringError>(getLastSocketErrorCode(),
- "pipe failed");
FDs[1].fd = PipeFD[0];
- PipeMutex.unlock();
int TimeoutCount = Timeout.value_or(std::chrono::milliseconds(-1)).count();
int PollStatus = ::poll(FDs, 2, TimeoutCount);
@@ -223,14 +224,22 @@ void ListeningSocket::shutdown() {
::close(FD);
::unlink(SocketPath.c_str());
+ // Ensure ::poll returns if shutdown is called by a seperate thread
char Byte = 'A';
- PipeMutex.lock();
- write(PipeFD[1], &Byte, 1);
- PipeMutex.unlock();
+ ::write(PipeFD[1], &Byte, 1);
+
FD = -1;
}
-ListeningSocket::~ListeningSocket() { shutdown(); }
+ListeningSocket::~ListeningSocket() {
+ shutdown();
+
+ // Close the pipe's FDs in the destructor instead of within
+ // ListeningSocket::shutdown to avoid unnecessary synchronization issues that
+ // would occur as PipeFD's values would have to be changed to -1
+ ::close(PipeFD[0]);
+ ::close(PipeFD[1]);
+}
//===----------------------------------------------------------------------===//
// raw_socket_stream
>From e1bf4d12f6da1b5718fe7016aa10a62f372c0ff7 Mon Sep 17 00:00:00 2001
From: cpsughrue <cpsughrue at gmail.com>
Date: Sun, 10 Mar 2024 20:33:41 -0400
Subject: [PATCH 6/7] Add thread headerfile to unit test
---
llvm/unittests/Support/raw_socket_stream_test.cpp | 1 +
1 file changed, 1 insertion(+)
diff --git a/llvm/unittests/Support/raw_socket_stream_test.cpp b/llvm/unittests/Support/raw_socket_stream_test.cpp
index 179b9d89f03c1f..9989ac8ca174f2 100644
--- a/llvm/unittests/Support/raw_socket_stream_test.cpp
+++ b/llvm/unittests/Support/raw_socket_stream_test.cpp
@@ -9,6 +9,7 @@
#include <future>
#include <iostream>
#include <stdlib.h>
+#include <thread>
#ifdef _WIN32
#include "llvm/Support/Windows/WindowsSupport.h"
>From 013b1f5c601f0e5e52e250deaf4e0e3354cfdb24 Mon Sep 17 00:00:00 2001
From: cpsughrue <cpsughrue at gmail.com>
Date: Sun, 10 Mar 2024 23:39:01 -0400
Subject: [PATCH 7/7] fix windows build
---
llvm/lib/Support/raw_socket_stream.cpp | 48 +++++++++++++++-----------
1 file changed, 28 insertions(+), 20 deletions(-)
diff --git a/llvm/lib/Support/raw_socket_stream.cpp b/llvm/lib/Support/raw_socket_stream.cpp
index 76c4c4bd5a5336..f1b84d272db214 100644
--- a/llvm/lib/Support/raw_socket_stream.cpp
+++ b/llvm/lib/Support/raw_socket_stream.cpp
@@ -18,12 +18,12 @@
#include <atomic>
#include <fcntl.h>
-#include <poll.h>
#include <thread>
#ifndef _WIN32
#include <sys/socket.h>
#include <sys/un.h>
+#include <poll.h>
#else
#include "llvm/Support/Windows/WindowsSupport.h"
// winsock2.h must be included before afunix.h. Briefly turn off clang-format to
@@ -140,7 +140,7 @@ ListeningSocket::createListeningUnixSocket(StringRef SocketPath,
#ifdef _WIN32
WSABalancer _;
SOCKET Socket = socket(AF_UNIX, SOCK_STREAM, 0);
- if (Socket == INVALID_SOCKET) {
+ if (Socket == INVALID_SOCKET)
#else
int Socket = socket(AF_UNIX, SOCK_STREAM, 0);
if (Socket == -1)
@@ -148,30 +148,35 @@ ListeningSocket::createListeningUnixSocket(StringRef SocketPath,
return llvm::make_error<StringError>(getLastSocketErrorCode(),
"socket create failed");
- struct sockaddr_un Addr = setSocketAddr(SocketPath);
- if (::bind(Socket, (struct sockaddr *)&Addr, sizeof(Addr)) == -1) {
- // Grab error code from call to ::bind before calling ::close
- std::error_code EC = getLastSocketErrorCode();
- ::close(Socket);
- return llvm::make_error<StringError>(EC, "Bind error");
- }
+ struct sockaddr_un Addr = setSocketAddr(SocketPath);
+ if (::bind(Socket, (struct sockaddr *)&Addr, sizeof(Addr)) == -1) {
+ // Grab error code from call to ::bind before calling ::close
+ std::error_code EC = getLastSocketErrorCode();
+ ::close(Socket);
+ return llvm::make_error<StringError>(EC, "Bind error");
+ }
- // Mark socket as passive so incoming connections can be accepted
- if (::listen(Socket, MaxBacklog) == -1)
- return llvm::make_error<StringError>(getLastSocketErrorCode(),
- "Listen error");
+ // Mark socket as passive so incoming connections can be accepted
+ if (::listen(Socket, MaxBacklog) == -1)
+ return llvm::make_error<StringError>(getLastSocketErrorCode(),
+ "Listen error");
- int PipeFD[2];
+ int PipeFD[2];
+#ifdef _WIN32
+ // Reserve 1 byte for the pipe and use default textmode
+ if (::_pipe(PipeFD, 1, 0) == -1)
+#else
if (::pipe(PipeFD) == -1)
- return llvm::make_error<StringError>(getLastSocketErrorCode(),
- "pipe failed");
+#endif // _WIN32
+ return llvm::make_error<StringError>(getLastSocketErrorCode(),
+ "pipe failed");
#ifdef _WIN32
- return ListeningSocket{_open_osfhandle(Socket, 0), SocketPath, PipeFD};
+ return ListeningSocket{_open_osfhandle(Socket, 0), SocketPath, PipeFD};
#else
return ListeningSocket{Socket, SocketPath, PipeFD};
#endif // _WIN32
-}
+ }
Expected<std::unique_ptr<raw_socket_stream>>
ListeningSocket::accept(std::optional<std::chrono::milliseconds> Timeout) {
@@ -184,14 +189,17 @@ ListeningSocket::accept(std::optional<std::chrono::milliseconds> Timeout) {
#else
FDs[0].fd = FD;
#endif
-
FDs[1].events = POLLIN;
FDs[1].fd = PipeFD[0];
int TimeoutCount = Timeout.value_or(std::chrono::milliseconds(-1)).count();
+#ifdef _WIN32
+ int PollStatus = WSAPoll(FDs, 2, TimeoutCount);
+ if (PollStatus == SOCKET_ERROR)
+#else
int PollStatus = ::poll(FDs, 2, TimeoutCount);
-
if (PollStatus == -1)
+#endif
return llvm::make_error<StringError>(getLastSocketErrorCode(),
"poll failed");
if (PollStatus == 0)
More information about the llvm-commits
mailing list