[Lldb-commits] [lldb] 9823d42 - [lldb] [Core] Split read thread support into ThreadedCommunication
Michał Górny via lldb-commits
lldb-commits at lists.llvm.org
Tue Sep 6 04:09:55 PDT 2022
Author: Michał Górny
Date: 2022-09-06T13:09:42+02:00
New Revision: 9823d42557eb1da3ecf2f771ea2cbc84a988ef92
URL: https://github.com/llvm/llvm-project/commit/9823d42557eb1da3ecf2f771ea2cbc84a988ef92
DIFF: https://github.com/llvm/llvm-project/commit/9823d42557eb1da3ecf2f771ea2cbc84a988ef92.diff
LOG: [lldb] [Core] Split read thread support into ThreadedCommunication
Split the read thread support from Communication into a dedicated
ThreadedCommunication subclass. The read thread support is used only
by a subset of Communication consumers, and it adds a lot of complexity
to the base class. Furthermore, having a dedicated subclass makes it
clear whether a particular consumer needs to account for the possibility
of read thread being running or not.
The modules currently calling `StartReadThread()` are updated to use
`ThreadedCommunication`. The remaining modules use the simplified
`Communication` class.
`SBCommunication` is changed to use `ThreadedCommunication` in order
to avoid changing the public API.
`CommunicationKDP` is updated in order to (hopefully) compile with
the new code. However, I do not have a Darwin box to test it, so I've
limited the changes to the bare minimum.
`GDBRemoteCommunication` is updated to become a `Broadcaster` directly.
Since it does not inherit from `ThreadedCommunication`, its event
support no longer collides with the one used for read thread and can
be implemented cleanly. The support for
`eBroadcastBitReadThreadDidExit` is removed from the code -- since
the read thread was not used, this event was never reported.
Sponsored by: The FreeBSD Foundation
Differential Revision: https://reviews.llvm.org/D133251
Added:
lldb/include/lldb/Core/ThreadedCommunication.h
lldb/source/Core/ThreadedCommunication.cpp
Modified:
lldb/include/lldb/API/SBCommunication.h
lldb/include/lldb/Core/Communication.h
lldb/include/lldb/Interpreter/ScriptInterpreter.h
lldb/include/lldb/Target/Process.h
lldb/include/lldb/lldb-forward.h
lldb/source/API/SBCommunication.cpp
lldb/source/Core/CMakeLists.txt
lldb/source/Core/Communication.cpp
lldb/source/Plugins/Process/MacOSX-Kernel/CommunicationKDP.cpp
lldb/source/Plugins/Process/MacOSX-Kernel/CommunicationKDP.h
lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunication.cpp
lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunication.h
lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunicationServerLLGS.cpp
lldb/source/Plugins/Process/gdb-remote/ProcessGDBRemote.cpp
lldb/source/Plugins/ScriptInterpreter/Python/ScriptInterpreterPython.cpp
lldb/unittests/Core/CommunicationTest.cpp
Removed:
################################################################################
diff --git a/lldb/include/lldb/API/SBCommunication.h b/lldb/include/lldb/API/SBCommunication.h
index e407d859d8850..fa8c88acd3fb8 100644
--- a/lldb/include/lldb/API/SBCommunication.h
+++ b/lldb/include/lldb/API/SBCommunication.h
@@ -75,7 +75,7 @@ class LLDB_API SBCommunication {
SBCommunication(const SBCommunication &) = delete;
const SBCommunication &operator=(const SBCommunication &) = delete;
- lldb_private::Communication *m_opaque = nullptr;
+ lldb_private::ThreadedCommunication *m_opaque = nullptr;
bool m_opaque_owned = false;
};
diff --git a/lldb/include/lldb/Core/Communication.h b/lldb/include/lldb/Core/Communication.h
index c07b06a6c2509..a0151527fe5de 100644
--- a/lldb/include/lldb/Core/Communication.h
+++ b/lldb/include/lldb/Core/Communication.h
@@ -9,22 +9,15 @@
#ifndef LLDB_CORE_COMMUNICATION_H
#define LLDB_CORE_COMMUNICATION_H
-#include "lldb/Host/HostThread.h"
-#include "lldb/Utility/Broadcaster.h"
#include "lldb/Utility/Timeout.h"
#include "lldb/lldb-defines.h"
#include "lldb/lldb-enumerations.h"
#include "lldb/lldb-forward.h"
#include "lldb/lldb-types.h"
-#include <atomic>
#include <mutex>
-#include <ratio>
#include <string>
-#include <cstddef>
-#include <cstdint>
-
namespace lldb_private {
class Connection;
class ConstString;
@@ -38,90 +31,22 @@ class Status;
/// approach has a couple of advantages: it allows a single instance of this
/// class to be used even though its connection can change. Connections could
/// negotiate for
diff erent connections based on abilities like starting with
-/// Bluetooth and negotiating up to WiFi if available. It also allows this
-/// class to be subclassed by any interfaces that don't want to give bytes but
-/// want to validate and give out packets. This can be done by overriding:
-///
-/// AppendBytesToCache (const uint8_t *src, size_t src_len, bool broadcast);
-///
-/// Communication inherits from Broadcaster which means it can be used in
-/// conjunction with Listener to wait for multiple broadcaster objects and
-/// multiple events from each of those objects. Communication defines a set of
-/// pre-defined event bits (see enumerations definitions that start with
-/// "eBroadcastBit" below).
-///
-/// There are two modes in which communications can occur:
-/// \li single-threaded
-/// \li multi-threaded
-///
-/// In single-threaded mode, all reads and writes happen synchronously on the
-/// calling thread.
-///
-/// In multi-threaded mode, a read thread is spawned that continually reads
-/// data and caches any received bytes. To start the read thread clients call:
-///
-/// bool Communication::StartReadThread (Status *);
-///
-/// If true is returned a read thread has been spawned that will continually
-/// execute a call to the pure virtual DoRead function:
+/// Bluetooth and negotiating up to WiFi if available.
///
-/// size_t Communication::ReadFromConnection (void *, size_t, uint32_t);
-///
-/// When bytes are received the data gets cached in \a m_bytes and this class
-/// will broadcast a \b eBroadcastBitReadThreadGotBytes event. Clients that
-/// want packet based communication should override AppendBytesToCache. The
-/// subclasses can choose to call the built in AppendBytesToCache with the \a
-/// broadcast parameter set to false. This will cause the \b
-/// eBroadcastBitReadThreadGotBytes event not get broadcast, and then the
-/// subclass can post a \b eBroadcastBitPacketAvailable event when a full
-/// packet of data has been received.
-///
-/// If the connection is disconnected a \b eBroadcastBitDisconnected event
-/// gets broadcast. If the read thread exits a \b
-/// eBroadcastBitReadThreadDidExit event will be broadcast. Clients can also
-/// post a \b eBroadcastBitReadThreadShouldExit event to this object which
-/// will cause the read thread to exit.
-class Communication : public Broadcaster {
+/// When using this class, all reads and writes happen synchronously on the
+/// calling thread. There is also a ThreadedCommunication class that supports
+/// multi-threaded mode.
+class Communication {
public:
- FLAGS_ANONYMOUS_ENUM(){
- eBroadcastBitDisconnected =
- (1u << 0), ///< Sent when the communications connection is lost.
- eBroadcastBitReadThreadGotBytes =
- (1u << 1), ///< Sent by the read thread when bytes become available.
- eBroadcastBitReadThreadDidExit =
- (1u
- << 2), ///< Sent by the read thread when it exits to inform clients.
- eBroadcastBitReadThreadShouldExit =
- (1u << 3), ///< Sent by clients that need to cancel the read thread.
- eBroadcastBitPacketAvailable =
- (1u << 4), ///< Sent when data received makes a complete packet.
- eBroadcastBitNoMorePendingInput = (1u << 5), ///< Sent by the read thread
- ///to indicate all pending
- ///input has been processed.
- kLoUserBroadcastBit =
- (1u << 16), ///< Subclasses can used bits 31:16 for any needed events.
- kHiUserBroadcastBit = (1u << 31),
- eAllEventBits = 0xffffffff};
-
- typedef void (*ReadThreadBytesReceived)(void *baton, const void *src,
- size_t src_len);
-
- /// Construct the Communication object with the specified name for the
- /// Broadcaster that this object inherits from.
- ///
- /// \param[in] broadcaster_name
- /// The name of the broadcaster object. This name should be as
- /// complete as possible to uniquely identify this object. The
- /// broadcaster name can be updated after the connect function
- /// is called.
- Communication(const char *broadcaster_name);
+ /// Construct the Communication object.
+ Communication();
/// Destructor.
///
/// The destructor is virtual since this class gets subclassed.
- ~Communication() override;
+ virtual ~Communication();
- void Clear();
+ virtual void Clear();
/// Connect using the current connection by passing \a url to its connect
/// function. string.
@@ -148,7 +73,7 @@ class Communication : public Broadcaster {
///
/// \see Status& Communication::GetError ();
/// \see bool Connection::Disconnect ();
- lldb::ConnectionStatus Disconnect(Status *error_ptr = nullptr);
+ virtual lldb::ConnectionStatus Disconnect(Status *error_ptr = nullptr);
/// Check if the connection is valid.
///
@@ -166,13 +91,6 @@ class Communication : public Broadcaster {
/// If no read thread is running, this function call the connection's
/// Connection::Read(...) function to get any available.
///
- /// If a read thread has been started, this function will check for any
- /// cached bytes that have already been read and return any currently
- /// available bytes. If no bytes are cached, it will wait for the bytes to
- /// become available by listening for the \a eBroadcastBitReadThreadGotBytes
- /// event. If this function consumes all of the bytes in the cache, it will
- /// reset the \a eBroadcastBitReadThreadGotBytes event bit.
- ///
/// \param[in] dst
/// A destination buffer that must be at least \a dst_len bytes
/// long.
@@ -188,8 +106,9 @@ class Communication : public Broadcaster {
/// The number of bytes actually read.
///
/// \see size_t Connection::Read (void *, size_t);
- size_t Read(void *dst, size_t dst_len, const Timeout<std::micro> &timeout,
- lldb::ConnectionStatus &status, Status *error_ptr);
+ virtual size_t Read(void *dst, size_t dst_len,
+ const Timeout<std::micro> &timeout,
+ lldb::ConnectionStatus &status, Status *error_ptr);
/// The actual write function that attempts to write to the communications
/// protocol.
@@ -237,69 +156,7 @@ class Communication : public Broadcaster {
///
/// \see
/// class Connection
- void SetConnection(std::unique_ptr<Connection> connection);
-
- /// Starts a read thread whose sole purpose it to read bytes from the
- /// current connection. This function will call connection's read function:
- ///
- /// size_t Connection::Read (void *, size_t);
- ///
- /// When bytes are read and cached, this function will call:
- ///
- /// Communication::AppendBytesToCache (const uint8_t * bytes, size_t len,
- /// bool
- /// broadcast);
- ///
- /// Subclasses should override this function if they wish to override the
- /// default action of caching the bytes and broadcasting a \b
- /// eBroadcastBitReadThreadGotBytes event.
- ///
- /// \return
- /// \b True if the read thread was successfully started, \b
- /// false otherwise.
- ///
- /// \see size_t Connection::Read (void *, size_t);
- /// \see void Communication::AppendBytesToCache (const uint8_t * bytes,
- /// size_t len, bool broadcast);
- virtual bool StartReadThread(Status *error_ptr = nullptr);
-
- /// Stops the read thread by cancelling it.
- ///
- /// \return
- /// \b True if the read thread was successfully canceled, \b
- /// false otherwise.
- virtual bool StopReadThread(Status *error_ptr = nullptr);
-
- virtual bool JoinReadThread(Status *error_ptr = nullptr);
- /// Checks if there is a currently running read thread.
- ///
- /// \return
- /// \b True if the read thread is running, \b false otherwise.
- bool ReadThreadIsRunning();
-
- /// The read thread function. This function will call the "DoRead"
- /// function continuously and wait for data to become available. When data
- /// is received it will append the available data to the internal cache and
- /// broadcast a \b eBroadcastBitReadThreadGotBytes event.
- ///
- /// \param[in] comm_ptr
- /// A pointer to an instance of this class.
- ///
- /// \return
- /// \b NULL.
- ///
- /// \see void Communication::ReadThreadGotBytes (const uint8_t *, size_t);
- lldb::thread_result_t ReadThread();
-
- void SetReadThreadBytesReceivedCallback(ReadThreadBytesReceived callback,
- void *callback_baton);
-
- /// Wait for the read thread to process all outstanding data.
- ///
- /// After this function returns, the read thread has processed all data that
- /// has been waiting in the Connection queue.
- ///
- void SynchronizeWithReadThread();
+ virtual void SetConnection(std::unique_ptr<Connection> connection);
static std::string ConnectionStatusAsString(lldb::ConnectionStatus status);
@@ -307,76 +164,17 @@ class Communication : public Broadcaster {
void SetCloseOnEOF(bool b) { m_close_on_eof = b; }
- static ConstString &GetStaticBroadcasterClass();
-
- ConstString &GetBroadcasterClass() const override {
- return GetStaticBroadcasterClass();
- }
-
protected:
lldb::ConnectionSP m_connection_sp; ///< The connection that is current in use
///by this communications class.
- HostThread m_read_thread; ///< The read thread handle in case we need to
- ///cancel the thread.
- std::atomic<bool> m_read_thread_enabled;
- std::atomic<bool> m_read_thread_did_exit;
- std::string
- m_bytes; ///< A buffer to cache bytes read in the ReadThread function.
- std::recursive_mutex m_bytes_mutex; ///< A mutex to protect multi-threaded
- ///access to the cached bytes.
- lldb::ConnectionStatus m_pass_status; ///< Connection status passthrough
- ///from read thread.
- Status m_pass_error; ///< Error passthrough from read thread.
std::mutex
m_write_mutex; ///< Don't let multiple threads write at the same time...
- std::mutex m_synchronize_mutex;
- ReadThreadBytesReceived m_callback;
- void *m_callback_baton;
bool m_close_on_eof;
size_t ReadFromConnection(void *dst, size_t dst_len,
const Timeout<std::micro> &timeout,
lldb::ConnectionStatus &status, Status *error_ptr);
- /// Append new bytes that get read from the read thread into the internal
- /// object byte cache. This will cause a \b eBroadcastBitReadThreadGotBytes
- /// event to be broadcast if \a broadcast is true.
- ///
- /// Subclasses can override this function in order to inspect the received
- /// data and check if a packet is available.
- ///
- /// Subclasses can also still call this function from the overridden method
- /// to allow the caching to correctly happen and suppress the broadcasting
- /// of the \a eBroadcastBitReadThreadGotBytes event by setting \a broadcast
- /// to false.
- ///
- /// \param[in] src
- /// A source buffer that must be at least \a src_len bytes
- /// long.
- ///
- /// \param[in] src_len
- /// The number of bytes to append to the cache.
- virtual void AppendBytesToCache(const uint8_t *src, size_t src_len,
- bool broadcast,
- lldb::ConnectionStatus status);
-
- /// Get any available bytes from our data cache. If this call empties the
- /// data cache, the \b eBroadcastBitReadThreadGotBytes event will be reset
- /// to signify no more bytes are available.
- ///
- /// \param[in] dst
- /// A destination buffer that must be at least \a dst_len bytes
- /// long.
- ///
- /// \param[in] dst_len
- /// The number of bytes to attempt to read from the cache,
- /// and also the max number of bytes that can be placed into
- /// \a dst.
- ///
- /// \return
- /// The number of bytes extracted from the data cache.
- size_t GetCachedBytes(void *dst, size_t dst_len);
-
private:
Communication(const Communication &) = delete;
const Communication &operator=(const Communication &) = delete;
diff --git a/lldb/include/lldb/Core/ThreadedCommunication.h b/lldb/include/lldb/Core/ThreadedCommunication.h
new file mode 100644
index 0000000000000..b7412c796f107
--- /dev/null
+++ b/lldb/include/lldb/Core/ThreadedCommunication.h
@@ -0,0 +1,288 @@
+//===-- ThreadedCommunication.h ---------------------------------*- C++ -*-===//
+//
+// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
+// See https://llvm.org/LICENSE.txt for license information.
+// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
+//
+//===----------------------------------------------------------------------===//
+
+#ifndef LLDB_CORE_THREADEDCOMMUNICATION_H
+#define LLDB_CORE_THREADEDCOMMUNICATION_H
+
+#include "lldb/Core/Communication.h"
+#include "lldb/Host/HostThread.h"
+#include "lldb/Utility/Broadcaster.h"
+
+#include <atomic>
+#include <mutex>
+#include <string>
+
+#include <cstddef>
+#include <cstdint>
+
+namespace lldb_private {
+
+/// \class ThreadedCommunication ThreadedCommunication.h
+/// "lldb/Core/ThreadedCommunication.h" Variation of Communication that
+/// supports threaded reads.
+///
+/// ThreadedCommunication enhances the base Communication class with support
+/// for multi-threaded mode. In this mode, a read thread is spawned that
+/// continually reads data and caches any received bytes. To start the read
+/// thread clients call:
+///
+/// bool ThreadedCommunication::StartReadThread (Status *);
+///
+/// If true is returned a read thread has been spawned that will continually
+/// execute a call to the pure virtual DoRead function:
+///
+/// size_t Communication::ReadFromConnection (void *, size_t, uint32_t);
+///
+/// When bytes are received the data gets cached in \a m_bytes and this class
+/// will broadcast a \b eBroadcastBitReadThreadGotBytes event. Clients that
+/// want packet based communication should override AppendBytesToCache. The
+/// subclasses can choose to call the built in AppendBytesToCache with the \a
+/// broadcast parameter set to false. This will cause the \b
+/// eBroadcastBitReadThreadGotBytes event not get broadcast, and then the
+/// subclass can post a \b eBroadcastBitPacketAvailable event when a full
+/// packet of data has been received.
+///
+/// If the connection is disconnected a \b eBroadcastBitDisconnected event
+/// gets broadcast. If the read thread exits a \b
+/// eBroadcastBitReadThreadDidExit event will be broadcast. Clients can also
+/// post a \b eBroadcastBitReadThreadShouldExit event to this object which
+/// will cause the read thread to exit.
+///
+/// ThreadedCommunication inherits from Broadcaster which means it can be used
+/// in conjunction with Listener to wait for multiple broadcaster objects and
+/// multiple events from each of those objects. ThreadedCommunication defines a
+/// set of pre-defined event bits (see enumerations definitions that start with
+/// "eBroadcastBit" below).
+class ThreadedCommunication : public Communication, public Broadcaster {
+ using Communication::Communication;
+
+public:
+ FLAGS_ANONYMOUS_ENUM(){
+ eBroadcastBitDisconnected =
+ (1u << 0), ///< Sent when the communications connection is lost.
+ eBroadcastBitReadThreadGotBytes =
+ (1u << 1), ///< Sent by the read thread when bytes become available.
+ eBroadcastBitReadThreadDidExit =
+ (1u
+ << 2), ///< Sent by the read thread when it exits to inform clients.
+ eBroadcastBitReadThreadShouldExit =
+ (1u << 3), ///< Sent by clients that need to cancel the read thread.
+ eBroadcastBitPacketAvailable =
+ (1u << 4), ///< Sent when data received makes a complete packet.
+ eBroadcastBitNoMorePendingInput = (1u << 5), ///< Sent by the read thread
+ /// to indicate all pending
+ /// input has been processed.
+ };
+
+ typedef void (*ReadThreadBytesReceived)(void *baton, const void *src,
+ size_t src_len);
+
+ /// Construct the ThreadedCommunication object with the specified name for the
+ /// Broadcaster that this object inherits from.
+ ///
+ /// \param[in] broadcaster_name
+ /// The name of the broadcaster object. This name should be as
+ /// complete as possible to uniquely identify this object. The
+ /// broadcaster name can be updated after the connect function
+ /// is called.
+ ThreadedCommunication(const char *broadcaster_name);
+
+ /// Destructor.
+ ///
+ /// The destructor is virtual since this class gets subclassed.
+ ~ThreadedCommunication() override;
+
+ void Clear() override;
+
+ /// Disconnect the communications connection if one is currently connected.
+ ///
+ /// \return
+ /// \b True if the disconnect succeeded, \b false otherwise. The
+ /// internal error object should be filled in with an
+ /// appropriate value based on the result of this function.
+ ///
+ /// \see Status& Communication::GetError ();
+ /// \see bool Connection::Disconnect ();
+ lldb::ConnectionStatus Disconnect(Status *error_ptr = nullptr) override;
+
+ /// Read bytes from the current connection.
+ ///
+ /// If no read thread is running, this function call the connection's
+ /// Connection::Read(...) function to get any available.
+ ///
+ /// If a read thread has been started, this function will check for any
+ /// cached bytes that have already been read and return any currently
+ /// available bytes. If no bytes are cached, it will wait for the bytes to
+ /// become available by listening for the \a eBroadcastBitReadThreadGotBytes
+ /// event. If this function consumes all of the bytes in the cache, it will
+ /// reset the \a eBroadcastBitReadThreadGotBytes event bit.
+ ///
+ /// \param[in] dst
+ /// A destination buffer that must be at least \a dst_len bytes
+ /// long.
+ ///
+ /// \param[in] dst_len
+ /// The number of bytes to attempt to read, and also the max
+ /// number of bytes that can be placed into \a dst.
+ ///
+ /// \param[in] timeout
+ /// A timeout value or llvm::None for no timeout.
+ ///
+ /// \return
+ /// The number of bytes actually read.
+ ///
+ /// \see size_t Connection::Read (void *, size_t);
+ size_t Read(void *dst, size_t dst_len, const Timeout<std::micro> &timeout,
+ lldb::ConnectionStatus &status, Status *error_ptr) override;
+
+ /// Sets the connection that it to be used by this class.
+ ///
+ /// By making a communication class that uses
diff erent connections it
+ /// allows a single communication interface to negotiate and change its
+ /// connection without any interruption to the client. It also allows the
+ /// Communication class to be subclassed for packet based communication.
+ ///
+ /// \param[in] connection
+ /// A connection that this class will own and destroy.
+ ///
+ /// \see
+ /// class Connection
+ void SetConnection(std::unique_ptr<Connection> connection) override;
+
+ /// Starts a read thread whose sole purpose it to read bytes from the
+ /// current connection. This function will call connection's read function:
+ ///
+ /// size_t Connection::Read (void *, size_t);
+ ///
+ /// When bytes are read and cached, this function will call:
+ ///
+ /// Communication::AppendBytesToCache (const uint8_t * bytes, size_t len,
+ /// bool
+ /// broadcast);
+ ///
+ /// Subclasses should override this function if they wish to override the
+ /// default action of caching the bytes and broadcasting a \b
+ /// eBroadcastBitReadThreadGotBytes event.
+ ///
+ /// \return
+ /// \b True if the read thread was successfully started, \b
+ /// false otherwise.
+ ///
+ /// \see size_t Connection::Read (void *, size_t);
+ /// \see void Communication::AppendBytesToCache (const uint8_t * bytes,
+ /// size_t len, bool broadcast);
+ virtual bool StartReadThread(Status *error_ptr = nullptr);
+
+ /// Stops the read thread by cancelling it.
+ ///
+ /// \return
+ /// \b True if the read thread was successfully canceled, \b
+ /// false otherwise.
+ virtual bool StopReadThread(Status *error_ptr = nullptr);
+
+ virtual bool JoinReadThread(Status *error_ptr = nullptr);
+ /// Checks if there is a currently running read thread.
+ ///
+ /// \return
+ /// \b True if the read thread is running, \b false otherwise.
+ bool ReadThreadIsRunning();
+
+ /// The read thread function. This function will call the "DoRead"
+ /// function continuously and wait for data to become available. When data
+ /// is received it will append the available data to the internal cache and
+ /// broadcast a \b eBroadcastBitReadThreadGotBytes event.
+ ///
+ /// \param[in] comm_ptr
+ /// A pointer to an instance of this class.
+ ///
+ /// \return
+ /// \b NULL.
+ ///
+ /// \see void Communication::ReadThreadGotBytes (const uint8_t *, size_t);
+ lldb::thread_result_t ReadThread();
+
+ void SetReadThreadBytesReceivedCallback(ReadThreadBytesReceived callback,
+ void *callback_baton);
+
+ /// Wait for the read thread to process all outstanding data.
+ ///
+ /// After this function returns, the read thread has processed all data that
+ /// has been waiting in the Connection queue.
+ ///
+ void SynchronizeWithReadThread();
+
+ static ConstString &GetStaticBroadcasterClass();
+
+ ConstString &GetBroadcasterClass() const override {
+ return GetStaticBroadcasterClass();
+ }
+
+protected:
+ HostThread m_read_thread; ///< The read thread handle in case we need to
+ /// cancel the thread.
+ std::atomic<bool> m_read_thread_enabled;
+ std::atomic<bool> m_read_thread_did_exit;
+ std::string
+ m_bytes; ///< A buffer to cache bytes read in the ReadThread function.
+ std::recursive_mutex m_bytes_mutex; ///< A mutex to protect multi-threaded
+ /// access to the cached bytes.
+ lldb::ConnectionStatus m_pass_status; ///< Connection status passthrough
+ /// from read thread.
+ Status m_pass_error; ///< Error passthrough from read thread.
+ std::mutex m_synchronize_mutex;
+ ReadThreadBytesReceived m_callback;
+ void *m_callback_baton;
+
+ /// Append new bytes that get read from the read thread into the internal
+ /// object byte cache. This will cause a \b eBroadcastBitReadThreadGotBytes
+ /// event to be broadcast if \a broadcast is true.
+ ///
+ /// Subclasses can override this function in order to inspect the received
+ /// data and check if a packet is available.
+ ///
+ /// Subclasses can also still call this function from the overridden method
+ /// to allow the caching to correctly happen and suppress the broadcasting
+ /// of the \a eBroadcastBitReadThreadGotBytes event by setting \a broadcast
+ /// to false.
+ ///
+ /// \param[in] src
+ /// A source buffer that must be at least \a src_len bytes
+ /// long.
+ ///
+ /// \param[in] src_len
+ /// The number of bytes to append to the cache.
+ virtual void AppendBytesToCache(const uint8_t *src, size_t src_len,
+ bool broadcast,
+ lldb::ConnectionStatus status);
+
+ /// Get any available bytes from our data cache. If this call empties the
+ /// data cache, the \b eBroadcastBitReadThreadGotBytes event will be reset
+ /// to signify no more bytes are available.
+ ///
+ /// \param[in] dst
+ /// A destination buffer that must be at least \a dst_len bytes
+ /// long.
+ ///
+ /// \param[in] dst_len
+ /// The number of bytes to attempt to read from the cache,
+ /// and also the max number of bytes that can be placed into
+ /// \a dst.
+ ///
+ /// \return
+ /// The number of bytes extracted from the data cache.
+ size_t GetCachedBytes(void *dst, size_t dst_len);
+
+private:
+ ThreadedCommunication(const ThreadedCommunication &) = delete;
+ const ThreadedCommunication &
+ operator=(const ThreadedCommunication &) = delete;
+};
+
+} // namespace lldb_private
+
+#endif // LLDB_CORE_THREADEDCOMMUNICATION_H
diff --git a/lldb/include/lldb/Interpreter/ScriptInterpreter.h b/lldb/include/lldb/Interpreter/ScriptInterpreter.h
index 83f784bde712e..cb3cafaf2ed51 100644
--- a/lldb/include/lldb/Interpreter/ScriptInterpreter.h
+++ b/lldb/include/lldb/Interpreter/ScriptInterpreter.h
@@ -13,10 +13,10 @@
#include "lldb/API/SBError.h"
#include "lldb/API/SBMemoryRegionInfo.h"
#include "lldb/Breakpoint/BreakpointOptions.h"
-#include "lldb/Core/Communication.h"
#include "lldb/Core/PluginInterface.h"
#include "lldb/Core/SearchFilter.h"
#include "lldb/Core/StreamFile.h"
+#include "lldb/Core/ThreadedCommunication.h"
#include "lldb/Host/PseudoTerminal.h"
#include "lldb/Interpreter/ScriptedProcessInterface.h"
#include "lldb/Utility/Broadcaster.h"
@@ -119,7 +119,7 @@ class ScriptInterpreterIORedirect {
lldb::FileSP m_input_file_sp;
lldb::StreamFileSP m_output_file_sp;
lldb::StreamFileSP m_error_file_sp;
- Communication m_communication;
+ ThreadedCommunication m_communication;
bool m_disconnect;
};
diff --git a/lldb/include/lldb/Target/Process.h b/lldb/include/lldb/Target/Process.h
index 05b0eb6237c71..5bca9f076cb58 100644
--- a/lldb/include/lldb/Target/Process.h
+++ b/lldb/include/lldb/Target/Process.h
@@ -22,10 +22,10 @@
#include <vector>
#include "lldb/Breakpoint/BreakpointSiteList.h"
-#include "lldb/Core/Communication.h"
#include "lldb/Core/LoadedModuleInfoList.h"
#include "lldb/Core/PluginInterface.h"
#include "lldb/Core/ThreadSafeValue.h"
+#include "lldb/Core/ThreadedCommunication.h"
#include "lldb/Core/UserSettingsController.h"
#include "lldb/Host/HostThread.h"
#include "lldb/Host/ProcessLaunchInfo.h"
@@ -2883,7 +2883,7 @@ void PruneThreadPlans();
m_unix_signals_sp; /// This is the current signal set for this process.
lldb::ABISP m_abi_sp;
lldb::IOHandlerSP m_process_input_reader;
- Communication m_stdio_communication;
+ ThreadedCommunication m_stdio_communication;
std::recursive_mutex m_stdio_communication_mutex;
bool m_stdin_forward; /// Remember if stdin must be forwarded to remote debug
/// server
diff --git a/lldb/include/lldb/lldb-forward.h b/lldb/include/lldb/lldb-forward.h
index 9ab7f3c1718ed..2fe6ac58be05c 100644
--- a/lldb/include/lldb/lldb-forward.h
+++ b/lldb/include/lldb/lldb-forward.h
@@ -236,6 +236,7 @@ class ThreadPlanStepThrough;
class ThreadPlanTracer;
class ThreadSpec;
class ThreadPostMortemTrace;
+class ThreadedCommunication;
class Trace;
class TraceCursor;
class TraceExporter;
diff --git a/lldb/source/API/SBCommunication.cpp b/lldb/source/API/SBCommunication.cpp
index 0a1dad1e2e8fb..b9c8b498a38e3 100644
--- a/lldb/source/API/SBCommunication.cpp
+++ b/lldb/source/API/SBCommunication.cpp
@@ -8,7 +8,7 @@
#include "lldb/API/SBCommunication.h"
#include "lldb/API/SBBroadcaster.h"
-#include "lldb/Core/Communication.h"
+#include "lldb/Core/ThreadedCommunication.h"
#include "lldb/Host/ConnectionFileDescriptor.h"
#include "lldb/Host/Host.h"
#include "lldb/Utility/Instrumentation.h"
@@ -19,7 +19,8 @@ using namespace lldb_private;
SBCommunication::SBCommunication() { LLDB_INSTRUMENT_VA(this); }
SBCommunication::SBCommunication(const char *broadcaster_name)
- : m_opaque(new Communication(broadcaster_name)), m_opaque_owned(true) {
+ : m_opaque(new ThreadedCommunication(broadcaster_name)),
+ m_opaque_owned(true) {
LLDB_INSTRUMENT_VA(this, broadcaster_name);
}
@@ -169,5 +170,5 @@ SBBroadcaster SBCommunication::GetBroadcaster() {
const char *SBCommunication::GetBroadcasterClass() {
LLDB_INSTRUMENT();
- return Communication::GetStaticBroadcasterClass().AsCString();
+ return ThreadedCommunication::GetStaticBroadcasterClass().AsCString();
}
diff --git a/lldb/source/Core/CMakeLists.txt b/lldb/source/Core/CMakeLists.txt
index d907177586e78..c578656463f8e 100644
--- a/lldb/source/Core/CMakeLists.txt
+++ b/lldb/source/Core/CMakeLists.txt
@@ -54,6 +54,7 @@ add_lldb_library(lldbCore
SourceManager.cpp
StreamAsynchronousIO.cpp
StreamFile.cpp
+ ThreadedCommunication.cpp
UserSettingsController.cpp
Value.cpp
ValueObject.cpp
diff --git a/lldb/source/Core/Communication.cpp b/lldb/source/Core/Communication.cpp
index 3606d92a9d4e0..5d890632ccc6a 100644
--- a/lldb/source/Core/Communication.cpp
+++ b/lldb/source/Core/Communication.cpp
@@ -8,22 +8,14 @@
#include "lldb/Core/Communication.h"
-#include "lldb/Host/HostThread.h"
-#include "lldb/Host/ThreadLauncher.h"
#include "lldb/Utility/Connection.h"
-#include "lldb/Utility/ConstString.h"
-#include "lldb/Utility/Event.h"
#include "lldb/Utility/LLDBLog.h"
-#include "lldb/Utility/Listener.h"
#include "lldb/Utility/Log.h"
#include "lldb/Utility/Status.h"
-#include "llvm/ADT/None.h"
-#include "llvm/ADT/Optional.h"
#include "llvm/Support/Compiler.h"
#include <algorithm>
-#include <chrono>
#include <cstring>
#include <memory>
@@ -34,42 +26,15 @@
using namespace lldb;
using namespace lldb_private;
-ConstString &Communication::GetStaticBroadcasterClass() {
- static ConstString class_name("lldb.communication");
- return class_name;
-}
-
-Communication::Communication(const char *name)
- : Broadcaster(nullptr, name), m_connection_sp(),
- m_read_thread_enabled(false), m_read_thread_did_exit(false), m_bytes(),
- m_bytes_mutex(), m_write_mutex(), m_synchronize_mutex(),
- m_callback(nullptr), m_callback_baton(nullptr), m_close_on_eof(true)
-
-{
-
- LLDB_LOG(GetLog(LLDBLog::Object | LLDBLog::Communication),
- "{0} Communication::Communication (name = {1})", this, name);
-
- SetEventName(eBroadcastBitDisconnected, "disconnected");
- SetEventName(eBroadcastBitReadThreadGotBytes, "got bytes");
- SetEventName(eBroadcastBitReadThreadDidExit, "read thread did exit");
- SetEventName(eBroadcastBitReadThreadShouldExit, "read thread should exit");
- SetEventName(eBroadcastBitPacketAvailable, "packet available");
- SetEventName(eBroadcastBitNoMorePendingInput, "no more pending input");
-
- CheckInWithManager();
+Communication::Communication()
+ : m_connection_sp(), m_write_mutex(), m_close_on_eof(true) {
}
Communication::~Communication() {
- LLDB_LOG(GetLog(LLDBLog::Object | LLDBLog::Communication),
- "{0} Communication::~Communication (name = {1})", this,
- GetBroadcasterName().AsCString());
Clear();
}
void Communication::Clear() {
- SetReadThreadBytesReceivedCallback(nullptr, nullptr);
- StopReadThread(nullptr);
Disconnect(nullptr);
}
@@ -91,8 +56,6 @@ ConnectionStatus Communication::Disconnect(Status *error_ptr) {
LLDB_LOG(GetLog(LLDBLog::Communication), "{0} Communication::Disconnect ()",
this);
- assert((!m_read_thread_enabled || m_read_thread_did_exit) &&
- "Disconnecting while the read thread is running is racy!");
lldb::ConnectionSP connection_sp(m_connection_sp);
if (connection_sp) {
ConnectionStatus status = connection_sp->Disconnect(error_ptr);
@@ -129,58 +92,6 @@ size_t Communication::Read(void *dst, size_t dst_len,
"this = {0}, dst = {1}, dst_len = {2}, timeout = {3}, connection = {4}",
this, dst, dst_len, timeout, m_connection_sp.get());
- if (m_read_thread_enabled) {
- // We have a dedicated read thread that is getting data for us
- size_t cached_bytes = GetCachedBytes(dst, dst_len);
- if (cached_bytes > 0) {
- status = eConnectionStatusSuccess;
- return cached_bytes;
- }
- if (timeout && timeout->count() == 0) {
- if (error_ptr)
- error_ptr->SetErrorString("Timed out.");
- status = eConnectionStatusTimedOut;
- return 0;
- }
-
- if (!m_connection_sp) {
- if (error_ptr)
- error_ptr->SetErrorString("Invalid connection.");
- status = eConnectionStatusNoConnection;
- return 0;
- }
-
- ListenerSP listener_sp(Listener::MakeListener("Communication::Read"));
- listener_sp->StartListeningForEvents(
- this, eBroadcastBitReadThreadGotBytes | eBroadcastBitReadThreadDidExit);
- EventSP event_sp;
- while (listener_sp->GetEvent(event_sp, timeout)) {
- const uint32_t event_type = event_sp->GetType();
- if (event_type & eBroadcastBitReadThreadGotBytes) {
- return GetCachedBytes(dst, dst_len);
- }
-
- if (event_type & eBroadcastBitReadThreadDidExit) {
- // If the thread exited of its own accord, it either means it
- // hit an end-of-file condition or an error.
- status = m_pass_status;
- if (error_ptr)
- *error_ptr = std::move(m_pass_error);
-
- if (GetCloseOnEOF())
- Disconnect(nullptr);
- return 0;
- }
- }
-
- if (error_ptr)
- error_ptr->SetErrorString("Timed out.");
- status = eConnectionStatusTimedOut;
- return 0;
- }
-
- // We aren't using a read thread, just read the data synchronously in this
- // thread.
return ReadFromConnection(dst, dst_len, timeout, status, error_ptr);
}
@@ -213,104 +124,6 @@ size_t Communication::WriteAll(const void *src, size_t src_len,
return total_written;
}
-bool Communication::StartReadThread(Status *error_ptr) {
- if (error_ptr)
- error_ptr->Clear();
-
- if (m_read_thread.IsJoinable())
- return true;
-
- LLDB_LOG(GetLog(LLDBLog::Communication),
- "{0} Communication::StartReadThread ()", this);
-
- const std::string thread_name =
- llvm::formatv("<lldb.comm.{0}>", GetBroadcasterName());
-
- m_read_thread_enabled = true;
- m_read_thread_did_exit = false;
- auto maybe_thread = ThreadLauncher::LaunchThread(
- thread_name, [this] { return ReadThread(); });
- if (maybe_thread) {
- m_read_thread = *maybe_thread;
- } else {
- if (error_ptr)
- *error_ptr = Status(maybe_thread.takeError());
- else {
- LLDB_LOG(GetLog(LLDBLog::Host), "failed to launch host thread: {}",
- llvm::toString(maybe_thread.takeError()));
- }
- }
-
- if (!m_read_thread.IsJoinable())
- m_read_thread_enabled = false;
-
- return m_read_thread_enabled;
-}
-
-bool Communication::StopReadThread(Status *error_ptr) {
- if (!m_read_thread.IsJoinable())
- return true;
-
- LLDB_LOG(GetLog(LLDBLog::Communication),
- "{0} Communication::StopReadThread ()", this);
-
- m_read_thread_enabled = false;
-
- BroadcastEvent(eBroadcastBitReadThreadShouldExit, nullptr);
-
- // error = m_read_thread.Cancel();
-
- Status error = m_read_thread.Join(nullptr);
- return error.Success();
-}
-
-bool Communication::JoinReadThread(Status *error_ptr) {
- if (!m_read_thread.IsJoinable())
- return true;
-
- Status error = m_read_thread.Join(nullptr);
- return error.Success();
-}
-
-size_t Communication::GetCachedBytes(void *dst, size_t dst_len) {
- std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex);
- if (!m_bytes.empty()) {
- // If DST is nullptr and we have a thread, then return the number of bytes
- // that are available so the caller can call again
- if (dst == nullptr)
- return m_bytes.size();
-
- const size_t len = std::min<size_t>(dst_len, m_bytes.size());
-
- ::memcpy(dst, m_bytes.c_str(), len);
- m_bytes.erase(m_bytes.begin(), m_bytes.begin() + len);
-
- return len;
- }
- return 0;
-}
-
-void Communication::AppendBytesToCache(const uint8_t *bytes, size_t len,
- bool broadcast,
- ConnectionStatus status) {
- LLDB_LOG(GetLog(LLDBLog::Communication),
- "{0} Communication::AppendBytesToCache (src = {1}, src_len = {2}, "
- "broadcast = {3})",
- this, bytes, (uint64_t)len, broadcast);
- if ((bytes == nullptr || len == 0) &&
- (status != lldb::eConnectionStatusEndOfFile))
- return;
- if (m_callback) {
- // If the user registered a callback, then call it and do not broadcast
- m_callback(m_callback_baton, bytes, len);
- } else if (bytes != nullptr && len > 0) {
- std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex);
- m_bytes.append((const char *)bytes, len);
- if (broadcast)
- BroadcastEventIfUnique(eBroadcastBitReadThreadGotBytes);
- }
-}
-
size_t Communication::ReadFromConnection(void *dst, size_t dst_len,
const Timeout<std::micro> &timeout,
ConnectionStatus &status,
@@ -325,115 +138,8 @@ size_t Communication::ReadFromConnection(void *dst, size_t dst_len,
return 0;
}
-bool Communication::ReadThreadIsRunning() { return m_read_thread_enabled; }
-
-lldb::thread_result_t Communication::ReadThread() {
- Log *log = GetLog(LLDBLog::Communication);
-
- LLDB_LOG(log, "Communication({0}) thread starting...", this);
-
- uint8_t buf[1024];
-
- Status error;
- ConnectionStatus status = eConnectionStatusSuccess;
- bool done = false;
- bool disconnect = false;
- while (!done && m_read_thread_enabled) {
- size_t bytes_read = ReadFromConnection(
- buf, sizeof(buf), std::chrono::seconds(5), status, &error);
- if (bytes_read > 0 || status == eConnectionStatusEndOfFile)
- AppendBytesToCache(buf, bytes_read, true, status);
-
- switch (status) {
- case eConnectionStatusSuccess:
- break;
-
- case eConnectionStatusEndOfFile:
- done = true;
- disconnect = GetCloseOnEOF();
- break;
- case eConnectionStatusError: // Check GetError() for details
- if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) {
- // EIO on a pipe is usually caused by remote shutdown
- disconnect = GetCloseOnEOF();
- done = true;
- }
- if (error.Fail())
- LLDB_LOG(log, "error: {0}, status = {1}", error,
- Communication::ConnectionStatusAsString(status));
- break;
- case eConnectionStatusInterrupted: // Synchronization signal from
- // SynchronizeWithReadThread()
- // The connection returns eConnectionStatusInterrupted only when there is
- // no input pending to be read, so we can signal that.
- BroadcastEvent(eBroadcastBitNoMorePendingInput);
- break;
- case eConnectionStatusNoConnection: // No connection
- case eConnectionStatusLostConnection: // Lost connection while connected to
- // a valid connection
- done = true;
- [[fallthrough]];
- case eConnectionStatusTimedOut: // Request timed out
- if (error.Fail())
- LLDB_LOG(log, "error: {0}, status = {1}", error,
- Communication::ConnectionStatusAsString(status));
- break;
- }
- }
- m_pass_status = status;
- m_pass_error = std::move(error);
- LLDB_LOG(log, "Communication({0}) thread exiting...", this);
-
- // Handle threads wishing to synchronize with us.
- {
- // Prevent new ones from showing up.
- m_read_thread_did_exit = true;
-
- // Unblock any existing thread waiting for the synchronization signal.
- BroadcastEvent(eBroadcastBitNoMorePendingInput);
-
- // Wait for the thread to finish...
- std::lock_guard<std::mutex> guard(m_synchronize_mutex);
- // ... and disconnect.
- if (disconnect)
- Disconnect();
- }
-
- // Let clients know that this thread is exiting
- BroadcastEvent(eBroadcastBitReadThreadDidExit);
- return {};
-}
-
-void Communication::SetReadThreadBytesReceivedCallback(
- ReadThreadBytesReceived callback, void *callback_baton) {
- m_callback = callback;
- m_callback_baton = callback_baton;
-}
-
-void Communication::SynchronizeWithReadThread() {
- // Only one thread can do the synchronization dance at a time.
- std::lock_guard<std::mutex> guard(m_synchronize_mutex);
-
- // First start listening for the synchronization event.
- ListenerSP listener_sp(
- Listener::MakeListener("Communication::SyncronizeWithReadThread"));
- listener_sp->StartListeningForEvents(this, eBroadcastBitNoMorePendingInput);
-
- // If the thread is not running, there is no point in synchronizing.
- if (!m_read_thread_enabled || m_read_thread_did_exit)
- return;
-
- // Notify the read thread.
- m_connection_sp->InterruptRead();
-
- // Wait for the synchronization event.
- EventSP event_sp;
- listener_sp->GetEvent(event_sp, llvm::None);
-}
-
void Communication::SetConnection(std::unique_ptr<Connection> connection) {
Disconnect(nullptr);
- StopReadThread(nullptr);
m_connection_sp = std::move(connection);
}
diff --git a/lldb/source/Core/ThreadedCommunication.cpp b/lldb/source/Core/ThreadedCommunication.cpp
new file mode 100644
index 0000000000000..ab89b474769a5
--- /dev/null
+++ b/lldb/source/Core/ThreadedCommunication.cpp
@@ -0,0 +1,353 @@
+//===-- ThreadedCommunication.cpp -----------------------------------------===//
+//
+// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
+// See https://llvm.org/LICENSE.txt for license information.
+// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
+//
+//===----------------------------------------------------------------------===//
+
+#include "lldb/Core/ThreadedCommunication.h"
+
+#include "lldb/Host/ThreadLauncher.h"
+#include "lldb/Utility/Connection.h"
+#include "lldb/Utility/ConstString.h"
+#include "lldb/Utility/Event.h"
+#include "lldb/Utility/LLDBLog.h"
+#include "lldb/Utility/Listener.h"
+#include "lldb/Utility/Log.h"
+#include "lldb/Utility/Status.h"
+
+#include "llvm/ADT/None.h"
+#include "llvm/Support/Compiler.h"
+
+#include <algorithm>
+#include <chrono>
+#include <cstring>
+#include <memory>
+
+#include <cerrno>
+#include <cinttypes>
+#include <cstdio>
+
+using namespace lldb;
+using namespace lldb_private;
+
+ConstString &ThreadedCommunication::GetStaticBroadcasterClass() {
+ static ConstString class_name("lldb.communication");
+ return class_name;
+}
+
+ThreadedCommunication::ThreadedCommunication(const char *name)
+ : Communication(), Broadcaster(nullptr, name), m_read_thread_enabled(false),
+ m_read_thread_did_exit(false), m_bytes(), m_bytes_mutex(),
+ m_synchronize_mutex(), m_callback(nullptr), m_callback_baton(nullptr) {
+ LLDB_LOG(GetLog(LLDBLog::Object | LLDBLog::Communication),
+ "{0} ThreadedCommunication::ThreadedCommunication (name = {1})",
+ this, name);
+
+ SetEventName(eBroadcastBitDisconnected, "disconnected");
+ SetEventName(eBroadcastBitReadThreadGotBytes, "got bytes");
+ SetEventName(eBroadcastBitReadThreadDidExit, "read thread did exit");
+ SetEventName(eBroadcastBitReadThreadShouldExit, "read thread should exit");
+ SetEventName(eBroadcastBitPacketAvailable, "packet available");
+ SetEventName(eBroadcastBitNoMorePendingInput, "no more pending input");
+
+ CheckInWithManager();
+}
+
+ThreadedCommunication::~ThreadedCommunication() {
+ LLDB_LOG(GetLog(LLDBLog::Object | LLDBLog::Communication),
+ "{0} ThreadedCommunication::~ThreadedCommunication (name = {1})",
+ this, GetBroadcasterName().AsCString());
+}
+
+void ThreadedCommunication::Clear() {
+ SetReadThreadBytesReceivedCallback(nullptr, nullptr);
+ StopReadThread(nullptr);
+ Communication::Clear();
+}
+
+ConnectionStatus ThreadedCommunication::Disconnect(Status *error_ptr) {
+ assert((!m_read_thread_enabled || m_read_thread_did_exit) &&
+ "Disconnecting while the read thread is running is racy!");
+ return Communication::Disconnect(error_ptr);
+}
+
+size_t ThreadedCommunication::Read(void *dst, size_t dst_len,
+ const Timeout<std::micro> &timeout,
+ ConnectionStatus &status,
+ Status *error_ptr) {
+ Log *log = GetLog(LLDBLog::Communication);
+ LLDB_LOG(
+ log,
+ "this = {0}, dst = {1}, dst_len = {2}, timeout = {3}, connection = {4}",
+ this, dst, dst_len, timeout, m_connection_sp.get());
+
+ if (m_read_thread_enabled) {
+ // We have a dedicated read thread that is getting data for us
+ size_t cached_bytes = GetCachedBytes(dst, dst_len);
+ if (cached_bytes > 0) {
+ status = eConnectionStatusSuccess;
+ return cached_bytes;
+ }
+ if (timeout && timeout->count() == 0) {
+ if (error_ptr)
+ error_ptr->SetErrorString("Timed out.");
+ status = eConnectionStatusTimedOut;
+ return 0;
+ }
+
+ if (!m_connection_sp) {
+ if (error_ptr)
+ error_ptr->SetErrorString("Invalid connection.");
+ status = eConnectionStatusNoConnection;
+ return 0;
+ }
+
+ ListenerSP listener_sp(
+ Listener::MakeListener("ThreadedCommunication::Read"));
+ listener_sp->StartListeningForEvents(
+ this, eBroadcastBitReadThreadGotBytes | eBroadcastBitReadThreadDidExit);
+ EventSP event_sp;
+ while (listener_sp->GetEvent(event_sp, timeout)) {
+ const uint32_t event_type = event_sp->GetType();
+ if (event_type & eBroadcastBitReadThreadGotBytes) {
+ return GetCachedBytes(dst, dst_len);
+ }
+
+ if (event_type & eBroadcastBitReadThreadDidExit) {
+ // If the thread exited of its own accord, it either means it
+ // hit an end-of-file condition or an error.
+ status = m_pass_status;
+ if (error_ptr)
+ *error_ptr = std::move(m_pass_error);
+
+ if (GetCloseOnEOF())
+ Disconnect(nullptr);
+ return 0;
+ }
+ }
+
+ if (error_ptr)
+ error_ptr->SetErrorString("Timed out.");
+ status = eConnectionStatusTimedOut;
+ return 0;
+ }
+
+ // We aren't using a read thread, just read the data synchronously in this
+ // thread.
+ return Communication::Read(dst, dst_len, timeout, status, error_ptr);
+}
+
+bool ThreadedCommunication::StartReadThread(Status *error_ptr) {
+ if (error_ptr)
+ error_ptr->Clear();
+
+ if (m_read_thread.IsJoinable())
+ return true;
+
+ LLDB_LOG(GetLog(LLDBLog::Communication),
+ "{0} ThreadedCommunication::StartReadThread ()", this);
+
+ const std::string thread_name =
+ llvm::formatv("<lldb.comm.{0}>", GetBroadcasterName());
+
+ m_read_thread_enabled = true;
+ m_read_thread_did_exit = false;
+ auto maybe_thread = ThreadLauncher::LaunchThread(
+ thread_name, [this] { return ReadThread(); });
+ if (maybe_thread) {
+ m_read_thread = *maybe_thread;
+ } else {
+ if (error_ptr)
+ *error_ptr = Status(maybe_thread.takeError());
+ else {
+ LLDB_LOG(GetLog(LLDBLog::Host), "failed to launch host thread: {}",
+ llvm::toString(maybe_thread.takeError()));
+ }
+ }
+
+ if (!m_read_thread.IsJoinable())
+ m_read_thread_enabled = false;
+
+ return m_read_thread_enabled;
+}
+
+bool ThreadedCommunication::StopReadThread(Status *error_ptr) {
+ if (!m_read_thread.IsJoinable())
+ return true;
+
+ LLDB_LOG(GetLog(LLDBLog::Communication),
+ "{0} ThreadedCommunication::StopReadThread ()", this);
+
+ m_read_thread_enabled = false;
+
+ BroadcastEvent(eBroadcastBitReadThreadShouldExit, nullptr);
+
+ // error = m_read_thread.Cancel();
+
+ Status error = m_read_thread.Join(nullptr);
+ return error.Success();
+}
+
+bool ThreadedCommunication::JoinReadThread(Status *error_ptr) {
+ if (!m_read_thread.IsJoinable())
+ return true;
+
+ Status error = m_read_thread.Join(nullptr);
+ return error.Success();
+}
+
+size_t ThreadedCommunication::GetCachedBytes(void *dst, size_t dst_len) {
+ std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex);
+ if (!m_bytes.empty()) {
+ // If DST is nullptr and we have a thread, then return the number of bytes
+ // that are available so the caller can call again
+ if (dst == nullptr)
+ return m_bytes.size();
+
+ const size_t len = std::min<size_t>(dst_len, m_bytes.size());
+
+ ::memcpy(dst, m_bytes.c_str(), len);
+ m_bytes.erase(m_bytes.begin(), m_bytes.begin() + len);
+
+ return len;
+ }
+ return 0;
+}
+
+void ThreadedCommunication::AppendBytesToCache(const uint8_t *bytes, size_t len,
+ bool broadcast,
+ ConnectionStatus status) {
+ LLDB_LOG(GetLog(LLDBLog::Communication),
+ "{0} ThreadedCommunication::AppendBytesToCache (src = {1}, src_len "
+ "= {2}, "
+ "broadcast = {3})",
+ this, bytes, (uint64_t)len, broadcast);
+ if ((bytes == nullptr || len == 0) &&
+ (status != lldb::eConnectionStatusEndOfFile))
+ return;
+ if (m_callback) {
+ // If the user registered a callback, then call it and do not broadcast
+ m_callback(m_callback_baton, bytes, len);
+ } else if (bytes != nullptr && len > 0) {
+ std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex);
+ m_bytes.append((const char *)bytes, len);
+ if (broadcast)
+ BroadcastEventIfUnique(eBroadcastBitReadThreadGotBytes);
+ }
+}
+
+bool ThreadedCommunication::ReadThreadIsRunning() {
+ return m_read_thread_enabled;
+}
+
+lldb::thread_result_t ThreadedCommunication::ReadThread() {
+ Log *log = GetLog(LLDBLog::Communication);
+
+ LLDB_LOG(log, "Communication({0}) thread starting...", this);
+
+ uint8_t buf[1024];
+
+ Status error;
+ ConnectionStatus status = eConnectionStatusSuccess;
+ bool done = false;
+ bool disconnect = false;
+ while (!done && m_read_thread_enabled) {
+ size_t bytes_read = ReadFromConnection(
+ buf, sizeof(buf), std::chrono::seconds(5), status, &error);
+ if (bytes_read > 0 || status == eConnectionStatusEndOfFile)
+ AppendBytesToCache(buf, bytes_read, true, status);
+
+ switch (status) {
+ case eConnectionStatusSuccess:
+ break;
+
+ case eConnectionStatusEndOfFile:
+ done = true;
+ disconnect = GetCloseOnEOF();
+ break;
+ case eConnectionStatusError: // Check GetError() for details
+ if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) {
+ // EIO on a pipe is usually caused by remote shutdown
+ disconnect = GetCloseOnEOF();
+ done = true;
+ }
+ if (error.Fail())
+ LLDB_LOG(log, "error: {0}, status = {1}", error,
+ ThreadedCommunication::ConnectionStatusAsString(status));
+ break;
+ case eConnectionStatusInterrupted: // Synchronization signal from
+ // SynchronizeWithReadThread()
+ // The connection returns eConnectionStatusInterrupted only when there is
+ // no input pending to be read, so we can signal that.
+ BroadcastEvent(eBroadcastBitNoMorePendingInput);
+ break;
+ case eConnectionStatusNoConnection: // No connection
+ case eConnectionStatusLostConnection: // Lost connection while connected to
+ // a valid connection
+ done = true;
+ [[fallthrough]];
+ case eConnectionStatusTimedOut: // Request timed out
+ if (error.Fail())
+ LLDB_LOG(log, "error: {0}, status = {1}", error,
+ ThreadedCommunication::ConnectionStatusAsString(status));
+ break;
+ }
+ }
+ m_pass_status = status;
+ m_pass_error = std::move(error);
+ LLDB_LOG(log, "Communication({0}) thread exiting...", this);
+
+ // Handle threads wishing to synchronize with us.
+ {
+ // Prevent new ones from showing up.
+ m_read_thread_did_exit = true;
+
+ // Unblock any existing thread waiting for the synchronization signal.
+ BroadcastEvent(eBroadcastBitNoMorePendingInput);
+
+ // Wait for the thread to finish...
+ std::lock_guard<std::mutex> guard(m_synchronize_mutex);
+ // ... and disconnect.
+ if (disconnect)
+ Disconnect();
+ }
+
+ // Let clients know that this thread is exiting
+ BroadcastEvent(eBroadcastBitReadThreadDidExit);
+ return {};
+}
+
+void ThreadedCommunication::SetReadThreadBytesReceivedCallback(
+ ReadThreadBytesReceived callback, void *callback_baton) {
+ m_callback = callback;
+ m_callback_baton = callback_baton;
+}
+
+void ThreadedCommunication::SynchronizeWithReadThread() {
+ // Only one thread can do the synchronization dance at a time.
+ std::lock_guard<std::mutex> guard(m_synchronize_mutex);
+
+ // First start listening for the synchronization event.
+ ListenerSP listener_sp(Listener::MakeListener(
+ "ThreadedCommunication::SyncronizeWithReadThread"));
+ listener_sp->StartListeningForEvents(this, eBroadcastBitNoMorePendingInput);
+
+ // If the thread is not running, there is no point in synchronizing.
+ if (!m_read_thread_enabled || m_read_thread_did_exit)
+ return;
+
+ // Notify the read thread.
+ m_connection_sp->InterruptRead();
+
+ // Wait for the synchronization event.
+ EventSP event_sp;
+ listener_sp->GetEvent(event_sp, llvm::None);
+}
+
+void ThreadedCommunication::SetConnection(
+ std::unique_ptr<Connection> connection) {
+ StopReadThread(nullptr);
+ Communication::SetConnection(std::move(connection));
+}
diff --git a/lldb/source/Plugins/Process/MacOSX-Kernel/CommunicationKDP.cpp b/lldb/source/Plugins/Process/MacOSX-Kernel/CommunicationKDP.cpp
index 343ab6bb17900..a63b437befc9a 100644
--- a/lldb/source/Plugins/Process/MacOSX-Kernel/CommunicationKDP.cpp
+++ b/lldb/source/Plugins/Process/MacOSX-Kernel/CommunicationKDP.cpp
@@ -29,7 +29,7 @@ using namespace lldb_private;
// CommunicationKDP constructor
CommunicationKDP::CommunicationKDP(const char *comm_name)
- : Communication(comm_name), m_addr_byte_size(4),
+ : Communication(), m_addr_byte_size(4),
m_byte_order(eByteOrderLittle), m_packet_timeout(5), m_sequence_mutex(),
m_is_running(false), m_session_key(0u), m_request_sequence_id(0u),
m_exception_sequence_id(0u), m_kdp_version_version(0u),
diff --git a/lldb/source/Plugins/Process/MacOSX-Kernel/CommunicationKDP.h b/lldb/source/Plugins/Process/MacOSX-Kernel/CommunicationKDP.h
index c70f7de5bdcf5..f7ab9a176b475 100644
--- a/lldb/source/Plugins/Process/MacOSX-Kernel/CommunicationKDP.h
+++ b/lldb/source/Plugins/Process/MacOSX-Kernel/CommunicationKDP.h
@@ -21,8 +21,6 @@
class CommunicationKDP : public lldb_private::Communication {
public:
- enum { eBroadcastBitRunPacketSent = kLoUserBroadcastBit };
-
const static uint32_t kMaxPacketSize = 1200;
const static uint32_t kMaxDataSize = 1024;
typedef lldb_private::StreamBuffer<4096> PacketStreamType;
diff --git a/lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunication.cpp b/lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunication.cpp
index 616de5148a5c1..33e041bb6e3e8 100644
--- a/lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunication.cpp
+++ b/lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunication.cpp
@@ -60,7 +60,7 @@ using namespace lldb_private::process_gdb_remote;
// GDBRemoteCommunication constructor
GDBRemoteCommunication::GDBRemoteCommunication(const char *comm_name,
const char *listener_name)
- : Communication(comm_name),
+ : Communication(), Broadcaster(nullptr, comm_name),
#ifdef LLDB_CONFIGURATION_DEBUG
m_packet_timeout(1000),
#else
diff --git a/lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunication.h b/lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunication.h
index 35e86c202b5b3..22737ab726c7e 100644
--- a/lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunication.h
+++ b/lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunication.h
@@ -80,10 +80,10 @@ enum GDBErrno {
class ProcessGDBRemote;
-class GDBRemoteCommunication : public Communication {
+class GDBRemoteCommunication : public Communication, public Broadcaster {
public:
enum {
- eBroadcastBitRunPacketSent = kLoUserBroadcastBit,
+ eBroadcastBitRunPacketSent = (1u << 0),
};
enum class PacketType { Invalid = 0, Standard, Notify };
@@ -180,6 +180,8 @@ class GDBRemoteCommunication : public Communication {
// false if this class represents a debug session for
// a single process
+ std::string m_bytes;
+ std::recursive_mutex m_bytes_mutex;
CompressionType m_compression_type;
PacketResult SendPacketNoLock(llvm::StringRef payload);
diff --git a/lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunicationServerLLGS.cpp b/lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunicationServerLLGS.cpp
index bc9ccad29afb1..6ca5c41cc1d01 100644
--- a/lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunicationServerLLGS.cpp
+++ b/lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunicationServerLLGS.cpp
@@ -74,7 +74,7 @@ GDBRemoteCommunicationServerLLGS::GDBRemoteCommunicationServerLLGS(
"gdb-remote.server.rx_packet"),
m_mainloop(mainloop), m_process_factory(process_factory),
m_current_process(nullptr), m_continue_process(nullptr),
- m_stdio_communication("process.stdio") {
+ m_stdio_communication() {
RegisterPacketHandlers();
}
diff --git a/lldb/source/Plugins/Process/gdb-remote/ProcessGDBRemote.cpp b/lldb/source/Plugins/Process/gdb-remote/ProcessGDBRemote.cpp
index 37272d3afabd3..c0765dcc65a2c 100644
--- a/lldb/source/Plugins/Process/gdb-remote/ProcessGDBRemote.cpp
+++ b/lldb/source/Plugins/Process/gdb-remote/ProcessGDBRemote.cpp
@@ -285,14 +285,6 @@ ProcessGDBRemote::ProcessGDBRemote(lldb::TargetSP target_sp,
__FUNCTION__);
}
- const uint32_t gdb_event_mask = Communication::eBroadcastBitReadThreadDidExit;
- if (m_async_listener_sp->StartListeningForEvents(
- &m_gdb_comm, gdb_event_mask) != gdb_event_mask) {
- LLDB_LOGF(log,
- "ProcessGDBRemote::%s failed to listen for m_gdb_comm events",
- __FUNCTION__);
- }
-
const uint64_t timeout_seconds =
GetGlobalPluginProperties().GetPacketTimeout();
if (timeout_seconds > 0)
@@ -3567,21 +3559,6 @@ thread_result_t ProcessGDBRemote::AsyncThread() {
done = true;
break;
- default:
- LLDB_LOGF(log,
- "ProcessGDBRemote::%s(pid = %" PRIu64
- ") got unknown event 0x%8.8x",
- __FUNCTION__, GetID(), event_type);
- done = true;
- break;
- }
- } else if (event_sp->BroadcasterIs(&m_gdb_comm)) {
- switch (event_type) {
- case Communication::eBroadcastBitReadThreadDidExit:
- SetExitStatus(-1, "lost connection");
- done = true;
- break;
-
default:
LLDB_LOGF(log,
"ProcessGDBRemote::%s(pid = %" PRIu64
diff --git a/lldb/source/Plugins/ScriptInterpreter/Python/ScriptInterpreterPython.cpp b/lldb/source/Plugins/ScriptInterpreter/Python/ScriptInterpreterPython.cpp
index d530936484b96..d0f67a5684c5e 100644
--- a/lldb/source/Plugins/ScriptInterpreter/Python/ScriptInterpreterPython.cpp
+++ b/lldb/source/Plugins/ScriptInterpreter/Python/ScriptInterpreterPython.cpp
@@ -25,9 +25,9 @@
#include "lldb/API/SBValue.h"
#include "lldb/Breakpoint/StoppointCallbackContext.h"
#include "lldb/Breakpoint/WatchpointOptions.h"
-#include "lldb/Core/Communication.h"
#include "lldb/Core/Debugger.h"
#include "lldb/Core/PluginManager.h"
+#include "lldb/Core/ThreadedCommunication.h"
#include "lldb/Core/ValueObject.h"
#include "lldb/DataFormatters/TypeSummary.h"
#include "lldb/Host/FileSystem.h"
diff --git a/lldb/unittests/Core/CommunicationTest.cpp b/lldb/unittests/Core/CommunicationTest.cpp
index a6da6302e5e14..19ca4cb3d2493 100644
--- a/lldb/unittests/Core/CommunicationTest.cpp
+++ b/lldb/unittests/Core/CommunicationTest.cpp
@@ -7,6 +7,7 @@
//===----------------------------------------------------------------------===//
#include "lldb/Core/Communication.h"
+#include "lldb/Core/ThreadedCommunication.h"
#include "lldb/Host/Config.h"
#include "lldb/Host/ConnectionFileDescriptor.h"
#include "lldb/Host/Pipe.h"
@@ -37,7 +38,7 @@ static void CommunicationReadTest(bool use_read_thread) {
ASSERT_THAT_ERROR(a->Write("test", num_bytes).ToError(), llvm::Succeeded());
ASSERT_EQ(num_bytes, 4U);
- Communication comm("test");
+ ThreadedCommunication comm("test");
comm.SetConnection(std::make_unique<ConnectionFileDescriptor>(b.release()));
comm.SetCloseOnEOF(true);
@@ -118,7 +119,7 @@ TEST_F(CommunicationTest, SynchronizeWhileClosing) {
std::unique_ptr<TCPSocket> a, b;
ASSERT_TRUE(CreateTCPConnectedSockets("localhost", &a, &b));
- Communication comm("test");
+ ThreadedCommunication comm("test");
comm.SetConnection(std::make_unique<ConnectionFileDescriptor>(b.release()));
comm.SetCloseOnEOF(true);
ASSERT_TRUE(comm.StartReadThread());
@@ -146,7 +147,7 @@ TEST_F(CommunicationTest, WriteAll) {
ConnectionFileDescriptor read_conn{pipe.ReleaseReadFileDescriptor(),
/*owns_fd=*/true};
- Communication write_comm("test");
+ Communication write_comm;
write_comm.SetConnection(
std::make_unique<ConnectionFileDescriptor>(write_fd, /*owns_fd=*/true));
More information about the lldb-commits
mailing list