[Lldb-commits] [lldb] [lldb] Update JSONTransport to use MainLoop for reading. (PR #152367)
via lldb-commits
lldb-commits at lists.llvm.org
Thu Aug 7 09:42:58 PDT 2025
llvmbot wrote:
<!--LLVM PR SUMMARY COMMENT-->
@llvm/pr-subscribers-lldb
Author: John Harrison (ashgti)
<details>
<summary>Changes</summary>
Reapply "[lldb] Update JSONTransport to use MainLoop for reading." (#<!-- -->152155)
This reverts commit cd40281685f642ad879e33f3fda8d1faa136ebf4.
This also includes some updates to try to address the platforms with failing tests.
I updated the JSONTransport and tests to use std::function instead of llvm:unique_function. I think the tests were failing due to the unique_function not being moved correctly in the loop on some platforms.
---
Patch is 54.04 KiB, truncated to 20.00 KiB below, full version: https://github.com/llvm/llvm-project/pull/152367.diff
11 Files Affected:
- (modified) lldb/include/lldb/Host/JSONTransport.h (+84-29)
- (modified) lldb/source/Host/common/JSONTransport.cpp (+53-114)
- (modified) lldb/test/API/tools/lldb-dap/io/TestDAP_io.py (+17-10)
- (modified) lldb/tools/lldb-dap/DAP.cpp (+65-63)
- (modified) lldb/tools/lldb-dap/DAP.h (+7)
- (modified) lldb/tools/lldb-dap/Transport.h (+1-1)
- (modified) lldb/unittests/DAP/DAPTest.cpp (+5-6)
- (modified) lldb/unittests/DAP/TestBase.cpp (+17-9)
- (modified) lldb/unittests/DAP/TestBase.h (+20)
- (modified) lldb/unittests/Host/JSONTransportTest.cpp (+228-71)
- (modified) lldb/unittests/ProtocolServer/ProtocolMCPServerTest.cpp (+76-55)
``````````diff
diff --git a/lldb/include/lldb/Host/JSONTransport.h b/lldb/include/lldb/Host/JSONTransport.h
index 4087cdf2b42f7..98bce6e265356 100644
--- a/lldb/include/lldb/Host/JSONTransport.h
+++ b/lldb/include/lldb/Host/JSONTransport.h
@@ -13,13 +13,15 @@
#ifndef LLDB_HOST_JSONTRANSPORT_H
#define LLDB_HOST_JSONTRANSPORT_H
+#include "lldb/Host/MainLoopBase.h"
#include "lldb/lldb-forward.h"
#include "llvm/ADT/StringRef.h"
#include "llvm/Support/Error.h"
#include "llvm/Support/FormatVariadic.h"
#include "llvm/Support/JSON.h"
-#include <chrono>
+#include <string>
#include <system_error>
+#include <vector>
namespace lldb_private {
@@ -28,27 +30,33 @@ class TransportEOFError : public llvm::ErrorInfo<TransportEOFError> {
static char ID;
TransportEOFError() = default;
-
- void log(llvm::raw_ostream &OS) const override {
- OS << "transport end of file reached";
- }
+ void log(llvm::raw_ostream &OS) const override { OS << "transport EOF"; }
std::error_code convertToErrorCode() const override {
- return llvm::inconvertibleErrorCode();
+ return std::make_error_code(std::errc::io_error);
}
};
-class TransportTimeoutError : public llvm::ErrorInfo<TransportTimeoutError> {
+class TransportUnhandledContentsError
+ : public llvm::ErrorInfo<TransportUnhandledContentsError> {
public:
static char ID;
- TransportTimeoutError() = default;
+ explicit TransportUnhandledContentsError(std::string unhandled_contents)
+ : m_unhandled_contents(unhandled_contents) {}
void log(llvm::raw_ostream &OS) const override {
- OS << "transport operation timed out";
+ OS << "transport EOF with unhandled contents " << m_unhandled_contents;
}
std::error_code convertToErrorCode() const override {
- return std::make_error_code(std::errc::timed_out);
+ return std::make_error_code(std::errc::bad_message);
}
+
+ const std::string &getUnhandledContents() const {
+ return m_unhandled_contents;
+ }
+
+private:
+ std::string m_unhandled_contents;
};
class TransportInvalidError : public llvm::ErrorInfo<TransportInvalidError> {
@@ -68,6 +76,10 @@ class TransportInvalidError : public llvm::ErrorInfo<TransportInvalidError> {
/// A transport class that uses JSON for communication.
class JSONTransport {
public:
+ using ReadHandleUP = MainLoopBase::ReadHandleUP;
+ template <typename T>
+ using Callback = std::function<void(MainLoopBase &, const llvm::Expected<T>)>;
+
JSONTransport(lldb::IOObjectSP input, lldb::IOObjectSP output);
virtual ~JSONTransport() = default;
@@ -83,24 +95,69 @@ class JSONTransport {
return WriteImpl(message);
}
- /// Reads the next message from the input stream.
+ /// Registers the transport with the MainLoop.
template <typename T>
- llvm::Expected<T> Read(const std::chrono::microseconds &timeout) {
- llvm::Expected<std::string> message = ReadImpl(timeout);
- if (!message)
- return message.takeError();
- return llvm::json::parse<T>(/*JSON=*/*message);
+ llvm::Expected<ReadHandleUP> RegisterReadObject(MainLoopBase &loop,
+ Callback<T> callback) {
+ Status error;
+ ReadHandleUP handle = loop.RegisterReadObject(
+ m_input,
+ [&](MainLoopBase &loop) {
+ char buffer[kReadBufferSize];
+ size_t len = sizeof(buffer);
+ if (llvm::Error error = m_input->Read(buffer, len).takeError()) {
+ callback(loop, std::move(error));
+ return;
+ }
+
+ if (len)
+ m_buffer.append(std::string(buffer, len));
+
+ // If the buffer has contents, try parsing any pending messages.
+ if (!m_buffer.empty()) {
+ llvm::Expected<std::vector<std::string>> messages = Parse();
+ if (llvm::Error error = messages.takeError()) {
+ callback(loop, std::move(error));
+ return;
+ }
+
+ for (const auto &message : *messages)
+ if constexpr (std::is_same<T, std::string>::value)
+ callback(loop, message);
+ else
+ callback(loop, llvm::json::parse<T>(message));
+ }
+
+ // On EOF, notify the callback after the remaining messages were
+ // handled.
+ if (len == 0) {
+ if (m_buffer.empty())
+ callback(loop, llvm::make_error<TransportEOFError>());
+ else
+ callback(loop, llvm::make_error<TransportUnhandledContentsError>(
+ m_buffer));
+ }
+ },
+ error);
+ if (error.Fail())
+ return error.takeError();
+ return handle;
}
protected:
+ template <typename... Ts> inline auto Logv(const char *Fmt, Ts &&...Vals) {
+ Log(llvm::formatv(Fmt, std::forward<Ts>(Vals)...).str());
+ }
virtual void Log(llvm::StringRef message);
virtual llvm::Error WriteImpl(const std::string &message) = 0;
- virtual llvm::Expected<std::string>
- ReadImpl(const std::chrono::microseconds &timeout) = 0;
+ virtual llvm::Expected<std::vector<std::string>> Parse() = 0;
lldb::IOObjectSP m_input;
lldb::IOObjectSP m_output;
+ std::string m_buffer;
+
+ static constexpr size_t kReadBufferSize = 1024;
};
/// A transport class for JSON with a HTTP header.
@@ -111,14 +168,13 @@ class HTTPDelimitedJSONTransport : public JSONTransport {
virtual ~HTTPDelimitedJSONTransport() = default;
protected:
- virtual llvm::Error WriteImpl(const std::string &message) override;
- virtual llvm::Expected<std::string>
- ReadImpl(const std::chrono::microseconds &timeout) override;
-
- // FIXME: Support any header.
- static constexpr llvm::StringLiteral kHeaderContentLength =
- "Content-Length: ";
- static constexpr llvm::StringLiteral kHeaderSeparator = "\r\n\r\n";
+ llvm::Error WriteImpl(const std::string &message) override;
+ llvm::Expected<std::vector<std::string>> Parse() override;
+
+ static constexpr llvm::StringLiteral kHeaderContentLength = "Content-Length";
+ static constexpr llvm::StringLiteral kHeaderFieldSeparator = ":";
+ static constexpr llvm::StringLiteral kHeaderSeparator = "\r\n";
+ static constexpr llvm::StringLiteral kEndOfHeader = "\r\n\r\n";
};
/// A transport class for JSON RPC.
@@ -129,9 +185,8 @@ class JSONRPCTransport : public JSONTransport {
virtual ~JSONRPCTransport() = default;
protected:
- virtual llvm::Error WriteImpl(const std::string &message) override;
- virtual llvm::Expected<std::string>
- ReadImpl(const std::chrono::microseconds &timeout) override;
+ llvm::Error WriteImpl(const std::string &message) override;
+ llvm::Expected<std::vector<std::string>> Parse() override;
static constexpr llvm::StringLiteral kMessageSeparator = "\n";
};
diff --git a/lldb/source/Host/common/JSONTransport.cpp b/lldb/source/Host/common/JSONTransport.cpp
index 546c12c8f7114..c3a3b06ecbced 100644
--- a/lldb/source/Host/common/JSONTransport.cpp
+++ b/lldb/source/Host/common/JSONTransport.cpp
@@ -7,17 +7,14 @@
//===----------------------------------------------------------------------===//
#include "lldb/Host/JSONTransport.h"
-#include "lldb/Utility/IOObject.h"
#include "lldb/Utility/LLDBLog.h"
#include "lldb/Utility/Log.h"
-#include "lldb/Utility/SelectHelper.h"
#include "lldb/Utility/Status.h"
#include "lldb/lldb-forward.h"
#include "llvm/ADT/StringExtras.h"
#include "llvm/ADT/StringRef.h"
#include "llvm/Support/Error.h"
#include "llvm/Support/raw_ostream.h"
-#include <optional>
#include <string>
#include <utility>
@@ -25,64 +22,6 @@ using namespace llvm;
using namespace lldb;
using namespace lldb_private;
-/// ReadFull attempts to read the specified number of bytes. If EOF is
-/// encountered, an empty string is returned.
-static Expected<std::string>
-ReadFull(IOObject &descriptor, size_t length,
- std::optional<std::chrono::microseconds> timeout = std::nullopt) {
- if (!descriptor.IsValid())
- return llvm::make_error<TransportInvalidError>();
-
- bool timeout_supported = true;
- // FIXME: SelectHelper does not work with NativeFile on Win32.
-#if _WIN32
- timeout_supported = descriptor.GetFdType() == IOObject::eFDTypeSocket;
-#endif
-
- if (timeout && timeout_supported) {
- SelectHelper sh;
- sh.SetTimeout(*timeout);
- sh.FDSetRead(
- reinterpret_cast<lldb::socket_t>(descriptor.GetWaitableHandle()));
- Status status = sh.Select();
- if (status.Fail()) {
- // Convert timeouts into a specific error.
- if (status.GetType() == lldb::eErrorTypePOSIX &&
- status.GetError() == ETIMEDOUT)
- return make_error<TransportTimeoutError>();
- return status.takeError();
- }
- }
-
- std::string data;
- data.resize(length);
- Status status = descriptor.Read(data.data(), length);
- if (status.Fail())
- return status.takeError();
-
- // Read returns '' on EOF.
- if (length == 0)
- return make_error<TransportEOFError>();
-
- // Return the actual number of bytes read.
- return data.substr(0, length);
-}
-
-static Expected<std::string>
-ReadUntil(IOObject &descriptor, StringRef delimiter,
- std::optional<std::chrono::microseconds> timeout = std::nullopt) {
- std::string buffer;
- buffer.reserve(delimiter.size() + 1);
- while (!llvm::StringRef(buffer).ends_with(delimiter)) {
- Expected<std::string> next =
- ReadFull(descriptor, buffer.empty() ? delimiter.size() : 1, timeout);
- if (auto Err = next.takeError())
- return std::move(Err);
- buffer += *next;
- }
- return buffer.substr(0, buffer.size() - delimiter.size());
-}
-
JSONTransport::JSONTransport(IOObjectSP input, IOObjectSP output)
: m_input(std::move(input)), m_output(std::move(output)) {}
@@ -90,80 +29,80 @@ void JSONTransport::Log(llvm::StringRef message) {
LLDB_LOG(GetLog(LLDBLog::Host), "{0}", message);
}
-Expected<std::string>
-HTTPDelimitedJSONTransport::ReadImpl(const std::chrono::microseconds &timeout) {
- if (!m_input || !m_input->IsValid())
- return llvm::make_error<TransportInvalidError>();
+// Parses messages based on
+// https://microsoft.github.io/debug-adapter-protocol/overview#base-protocol
+Expected<std::vector<std::string>> HTTPDelimitedJSONTransport::Parse() {
+ std::vector<std::string> messages;
+ StringRef buffer = m_buffer;
+ while (buffer.contains(kEndOfHeader)) {
+ auto [headers, rest] = buffer.split(kEndOfHeader);
+ size_t content_length = 0;
+ // HTTP Headers are formatted like `<field-name> ':' [<field-value>]`.
+ for (const auto &header : llvm::split(headers, kHeaderSeparator)) {
+ auto [key, value] = header.split(kHeaderFieldSeparator);
+ // 'Content-Length' is the only meaningful key at the moment. Others are
+ // ignored.
+ if (!key.equals_insensitive(kHeaderContentLength))
+ continue;
+
+ value = value.trim();
+ if (!llvm::to_integer(value, content_length, 10))
+ return createStringError(std::errc::invalid_argument,
+ "invalid content length: %s",
+ value.str().c_str());
+ }
+
+ // Check if we have enough data.
+ if (content_length > rest.size())
+ break;
- IOObject *input = m_input.get();
- Expected<std::string> message_header =
- ReadFull(*input, kHeaderContentLength.size(), timeout);
- if (!message_header)
- return message_header.takeError();
- if (*message_header != kHeaderContentLength)
- return createStringError(formatv("expected '{0}' and got '{1}'",
- kHeaderContentLength, *message_header)
- .str());
-
- Expected<std::string> raw_length = ReadUntil(*input, kHeaderSeparator);
- if (!raw_length)
- return handleErrors(raw_length.takeError(),
- [&](const TransportEOFError &E) -> llvm::Error {
- return createStringError(
- "unexpected EOF while reading header separator");
- });
-
- size_t length;
- if (!to_integer(*raw_length, length))
- return createStringError(
- formatv("invalid content length {0}", *raw_length).str());
-
- Expected<std::string> raw_json = ReadFull(*input, length);
- if (!raw_json)
- return handleErrors(
- raw_json.takeError(), [&](const TransportEOFError &E) -> llvm::Error {
- return createStringError("unexpected EOF while reading JSON");
- });
-
- Log(llvm::formatv("--> {0}", *raw_json).str());
-
- return raw_json;
+ StringRef body = rest.take_front(content_length);
+ buffer = rest.drop_front(content_length);
+ messages.emplace_back(body.str());
+ Logv("--> {0}", body);
+ }
+
+ // Store the remainder of the buffer for the next read callback.
+ m_buffer = buffer.str();
+
+ return std::move(messages);
}
Error HTTPDelimitedJSONTransport::WriteImpl(const std::string &message) {
if (!m_output || !m_output->IsValid())
return llvm::make_error<TransportInvalidError>();
- Log(llvm::formatv("<-- {0}", message).str());
+ Logv("<-- {0}", message);
std::string Output;
raw_string_ostream OS(Output);
- OS << kHeaderContentLength << message.length() << kHeaderSeparator << message;
+ OS << kHeaderContentLength << kHeaderFieldSeparator << ' ' << message.length()
+ << kHeaderSeparator << kHeaderSeparator << message;
size_t num_bytes = Output.size();
return m_output->Write(Output.data(), num_bytes).takeError();
}
-Expected<std::string>
-JSONRPCTransport::ReadImpl(const std::chrono::microseconds &timeout) {
- if (!m_input || !m_input->IsValid())
- return make_error<TransportInvalidError>();
-
- IOObject *input = m_input.get();
- Expected<std::string> raw_json =
- ReadUntil(*input, kMessageSeparator, timeout);
- if (!raw_json)
- return raw_json.takeError();
+Expected<std::vector<std::string>> JSONRPCTransport::Parse() {
+ std::vector<std::string> messages;
+ StringRef buf = m_buffer;
+ while (buf.contains(kMessageSeparator)) {
+ auto [raw_json, rest] = buf.split(kMessageSeparator);
+ buf = rest;
+ messages.emplace_back(raw_json.str());
+ Logv("--> {0}", raw_json);
+ }
- Log(llvm::formatv("--> {0}", *raw_json).str());
+ // Store the remainder of the buffer for the next read callback.
+ m_buffer = buf.str();
- return *raw_json;
+ return messages;
}
Error JSONRPCTransport::WriteImpl(const std::string &message) {
if (!m_output || !m_output->IsValid())
return llvm::make_error<TransportInvalidError>();
- Log(llvm::formatv("<-- {0}", message).str());
+ Logv("<-- {0}", message);
std::string Output;
llvm::raw_string_ostream OS(Output);
@@ -173,5 +112,5 @@ Error JSONRPCTransport::WriteImpl(const std::string &message) {
}
char TransportEOFError::ID;
-char TransportTimeoutError::ID;
+char TransportUnhandledContentsError::ID;
char TransportInvalidError::ID;
diff --git a/lldb/test/API/tools/lldb-dap/io/TestDAP_io.py b/lldb/test/API/tools/lldb-dap/io/TestDAP_io.py
index b72b98de412b4..af5c62a8c4eb5 100644
--- a/lldb/test/API/tools/lldb-dap/io/TestDAP_io.py
+++ b/lldb/test/API/tools/lldb-dap/io/TestDAP_io.py
@@ -8,6 +8,9 @@
import lldbdap_testcase
import dap_server
+EXIT_FAILURE = 1
+EXIT_SUCCESS = 0
+
class TestDAP_io(lldbdap_testcase.DAPTestCaseBase):
def launch(self):
@@ -41,40 +44,44 @@ def test_eof_immediately(self):
"""
process = self.launch()
process.stdin.close()
- self.assertEqual(process.wait(timeout=5.0), 0)
+ self.assertEqual(process.wait(timeout=5.0), EXIT_SUCCESS)
def test_invalid_header(self):
"""
- lldb-dap handles invalid message headers.
+ lldb-dap returns a failure exit code when the input stream is closed
+ with a malformed request header.
"""
process = self.launch()
- process.stdin.write(b"not the corret message header")
+ process.stdin.write(b"not the correct message header")
process.stdin.close()
- self.assertEqual(process.wait(timeout=5.0), 1)
+ self.assertEqual(process.wait(timeout=5.0), EXIT_FAILURE)
def test_partial_header(self):
"""
- lldb-dap handles parital message headers.
+ lldb-dap returns a failure exit code when the input stream is closed
+ with an incomplete message header is in the message buffer.
"""
process = self.launch()
process.stdin.write(b"Content-Length: ")
process.stdin.close()
- self.assertEqual(process.wait(timeout=5.0), 1)
+ self.assertEqual(process.wait(timeout=5.0), EXIT_FAILURE)
def test_incorrect_content_length(self):
"""
- lldb-dap handles malformed content length headers.
+ lldb-dap returns a failure exit code when reading malformed content
+ length headers.
"""
process = self.launch()
process.stdin.write(b"Content-Length: abc")
process.stdin.close()
- self.assertEqual(process.wait(timeout=5.0), 1)
+ self.assertEqual(process.wait(timeout=5.0), EXIT_FAILURE)
def test_partial_content_length(self):
"""
- lldb-dap handles partial messages.
+ lldb-dap returns a failure exit code when the input stream is closed
+ with a partial message in the message buffer.
"""
process = self.launch()
process.stdin.write(b"Content-Length: 10\r\n\r\n{")
process.stdin.close()
- self.assertEqual(process.wait(timeout=5.0), 1)
+ self.assertEqual(process.wait(timeout=5.0), EXIT_FAILURE)
diff --git a/lldb/tools/lldb-dap/DAP.cpp b/lldb/tools/lldb-dap/DAP.cpp
index cbd3b14463e25..55c5c9347bf6f 100644
--- a/lldb/tools/lldb-dap/DAP.cpp
+++ b/lldb/tools/lldb-dap/DAP.cpp
@@ -23,13 +23,14 @@
#include "Transport.h"
#include "lldb/API/SBBreakpoint.h"
#include "lldb/API/SBCommandInterpreter.h"
-#include "lldb/API/SBCommandReturnObject.h"
#include "lldb/API/SBEvent.h"
#include "lldb/API/SBLanguageRuntime.h"
#include "lldb/API/SBListener.h"
#include "lldb/API/SBProcess.h"
#include "lldb/API/SBStream.h"
-#include "lldb/Utility/IOObject.h"
+#include "lldb/Host/JSONTransport.h"
+#include "lldb/Host/MainLoop.h"
+#include "lldb/Host/MainLoopBase.h"
#include "lldb/Utility/Status.h"
#include "lldb/lldb-defines.h"
#include "lldb/lldb-enumerations.h"
@@ -52,7 +53,7 @@
#include <cstdarg>
#include <cstdint>
#include <cstdio>
-#include <fstream>
+#include <functional>
#include <future>
#include <memory>
#include <mutex>
@@ -919,6 +920,8 @@ llvm::Error DAP::Disconnect(bool terminateDebuggee) {
SendTerminatedEvent();
disconnecting = true;
+ m_loop.AddPendingCallback(
+ [](MainLoopBase &loop) { loop.RequestTermination(); });
return ToError(error);
}
@@ -949,75 +952,74 @@ static std::optional<T> getArgumentsIfRequest(const Message &pm,
return args;
}
-llvm::Error DAP::Loop() {
- // Can't use \a std::future<llvm::Error> because it doesn't compile on
- // Windows.
- std::future<lldb::SBError> queue_reader =
- std::async(std::launch::async, [&]() -> lldb::SBError {
- llvm::set_thread_name(transport.GetClientName() + ".transport_handler");
- auto cleanup = llvm::make_scope_exit([&]() {
- // Ensure we're marked as disconnecting when the reader exits.
- disconnecting = true;
- m_queue_cv.notify_all();
- });
-
- while (!disconnecting) {
- llvm::Expected<Message> next =
- transport.Read<protocol::Message>(std::chrono::seconds(1));
- if (next.errorIsA<TransportEOFError>()) {
- consumeError(next.takeError());
- break;
- }
+Status DAP::TransportHandler() {
+ llvm::set_thread_name(transport.GetClientName() + ".transport_handler");
- // If the read timed out, continue to check if we should disconnect.
- if (next.errorIsA<TransportTimeoutError>()) {
- consumeError(next.takeError());
- continue;
- }
+ auto cleanup = llvm::make_scope_exit([&]() {
+ // Ensure we're marked as disconnecting when the reader exits.
+ disconnecting = true;
+ m_queue_cv.notify_all();
+ });
- if (llvm::Error err = next.takeError()) {
- lldb::SBError errWrapper;
- errWrapper.SetErrorString(llvm::toString(std::move(err)).c_str());
- return errWrapper;
- }
+ Status ...
[truncated]
``````````
</details>
https://github.com/llvm/llvm-project/pull/152367
More information about the lldb-commits
mailing list