[Lldb-commits] [lldb] 9823d42 - [lldb] [Core] Split read thread support into ThreadedCommunication

Michał Górny via lldb-commits lldb-commits at lists.llvm.org
Tue Sep 6 04:09:55 PDT 2022


Author: Michał Górny
Date: 2022-09-06T13:09:42+02:00
New Revision: 9823d42557eb1da3ecf2f771ea2cbc84a988ef92

URL: https://github.com/llvm/llvm-project/commit/9823d42557eb1da3ecf2f771ea2cbc84a988ef92
DIFF: https://github.com/llvm/llvm-project/commit/9823d42557eb1da3ecf2f771ea2cbc84a988ef92.diff

LOG: [lldb] [Core] Split read thread support into ThreadedCommunication

Split the read thread support from Communication into a dedicated
ThreadedCommunication subclass.  The read thread support is used only
by a subset of Communication consumers, and it adds a lot of complexity
to the base class.  Furthermore, having a dedicated subclass makes it
clear whether a particular consumer needs to account for the possibility
of read thread being running or not.

The modules currently calling `StartReadThread()` are updated to use
`ThreadedCommunication`.  The remaining modules use the simplified
`Communication` class.

`SBCommunication` is changed to use `ThreadedCommunication` in order
to avoid changing the public API.

`CommunicationKDP` is updated in order to (hopefully) compile with
the new code.  However, I do not have a Darwin box to test it, so I've
limited the changes to the bare minimum.

`GDBRemoteCommunication` is updated to become a `Broadcaster` directly.
Since it does not inherit from `ThreadedCommunication`, its event
support no longer collides with the one used for read thread and can
be implemented cleanly.  The support for
`eBroadcastBitReadThreadDidExit` is removed from the code -- since
the read thread was not used, this event was never reported.

Sponsored by: The FreeBSD Foundation
Differential Revision: https://reviews.llvm.org/D133251

Added: 
    lldb/include/lldb/Core/ThreadedCommunication.h
    lldb/source/Core/ThreadedCommunication.cpp

Modified: 
    lldb/include/lldb/API/SBCommunication.h
    lldb/include/lldb/Core/Communication.h
    lldb/include/lldb/Interpreter/ScriptInterpreter.h
    lldb/include/lldb/Target/Process.h
    lldb/include/lldb/lldb-forward.h
    lldb/source/API/SBCommunication.cpp
    lldb/source/Core/CMakeLists.txt
    lldb/source/Core/Communication.cpp
    lldb/source/Plugins/Process/MacOSX-Kernel/CommunicationKDP.cpp
    lldb/source/Plugins/Process/MacOSX-Kernel/CommunicationKDP.h
    lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunication.cpp
    lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunication.h
    lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunicationServerLLGS.cpp
    lldb/source/Plugins/Process/gdb-remote/ProcessGDBRemote.cpp
    lldb/source/Plugins/ScriptInterpreter/Python/ScriptInterpreterPython.cpp
    lldb/unittests/Core/CommunicationTest.cpp

Removed: 
    


################################################################################
diff  --git a/lldb/include/lldb/API/SBCommunication.h b/lldb/include/lldb/API/SBCommunication.h
index e407d859d8850..fa8c88acd3fb8 100644
--- a/lldb/include/lldb/API/SBCommunication.h
+++ b/lldb/include/lldb/API/SBCommunication.h
@@ -75,7 +75,7 @@ class LLDB_API SBCommunication {
   SBCommunication(const SBCommunication &) = delete;
   const SBCommunication &operator=(const SBCommunication &) = delete;
 
-  lldb_private::Communication *m_opaque = nullptr;
+  lldb_private::ThreadedCommunication *m_opaque = nullptr;
   bool m_opaque_owned = false;
 };
 

diff  --git a/lldb/include/lldb/Core/Communication.h b/lldb/include/lldb/Core/Communication.h
index c07b06a6c2509..a0151527fe5de 100644
--- a/lldb/include/lldb/Core/Communication.h
+++ b/lldb/include/lldb/Core/Communication.h
@@ -9,22 +9,15 @@
 #ifndef LLDB_CORE_COMMUNICATION_H
 #define LLDB_CORE_COMMUNICATION_H
 
-#include "lldb/Host/HostThread.h"
-#include "lldb/Utility/Broadcaster.h"
 #include "lldb/Utility/Timeout.h"
 #include "lldb/lldb-defines.h"
 #include "lldb/lldb-enumerations.h"
 #include "lldb/lldb-forward.h"
 #include "lldb/lldb-types.h"
 
-#include <atomic>
 #include <mutex>
-#include <ratio>
 #include <string>
 
-#include <cstddef>
-#include <cstdint>
-
 namespace lldb_private {
 class Connection;
 class ConstString;
@@ -38,90 +31,22 @@ class Status;
 /// approach has a couple of advantages: it allows a single instance of this
 /// class to be used even though its connection can change. Connections could
 /// negotiate for 
diff erent connections based on abilities like starting with
-/// Bluetooth and negotiating up to WiFi if available. It also allows this
-/// class to be subclassed by any interfaces that don't want to give bytes but
-/// want to validate and give out packets. This can be done by overriding:
-///
-/// AppendBytesToCache (const uint8_t *src, size_t src_len, bool broadcast);
-///
-/// Communication inherits from Broadcaster which means it can be used in
-/// conjunction with Listener to wait for multiple broadcaster objects and
-/// multiple events from each of those objects. Communication defines a set of
-/// pre-defined event bits (see enumerations definitions that start with
-/// "eBroadcastBit" below).
-///
-/// There are two modes in which communications can occur:
-///     \li single-threaded
-///     \li multi-threaded
-///
-/// In single-threaded mode, all reads and writes happen synchronously on the
-/// calling thread.
-///
-/// In multi-threaded mode, a read thread is spawned that continually reads
-/// data and caches any received bytes. To start the read thread clients call:
-///
-///     bool Communication::StartReadThread (Status *);
-///
-/// If true is returned a read thread has been spawned that will continually
-/// execute a call to the pure virtual DoRead function:
+/// Bluetooth and negotiating up to WiFi if available.
 ///
-///     size_t Communication::ReadFromConnection (void *, size_t, uint32_t);
-///
-/// When bytes are received the data gets cached in \a m_bytes and this class
-/// will broadcast a \b eBroadcastBitReadThreadGotBytes event. Clients that
-/// want packet based communication should override AppendBytesToCache. The
-/// subclasses can choose to call the built in AppendBytesToCache with the \a
-/// broadcast parameter set to false. This will cause the \b
-/// eBroadcastBitReadThreadGotBytes event not get broadcast, and then the
-/// subclass can post a \b eBroadcastBitPacketAvailable event when a full
-/// packet of data has been received.
-///
-/// If the connection is disconnected a \b eBroadcastBitDisconnected event
-/// gets broadcast. If the read thread exits a \b
-/// eBroadcastBitReadThreadDidExit event will be broadcast. Clients can also
-/// post a \b eBroadcastBitReadThreadShouldExit event to this object which
-/// will cause the read thread to exit.
-class Communication : public Broadcaster {
+/// When using this class, all reads and writes happen synchronously on the
+/// calling thread. There is also a ThreadedCommunication class that supports
+/// multi-threaded mode.
+class Communication {
 public:
-  FLAGS_ANONYMOUS_ENUM(){
-      eBroadcastBitDisconnected =
-          (1u << 0), ///< Sent when the communications connection is lost.
-      eBroadcastBitReadThreadGotBytes =
-          (1u << 1), ///< Sent by the read thread when bytes become available.
-      eBroadcastBitReadThreadDidExit =
-          (1u
-           << 2), ///< Sent by the read thread when it exits to inform clients.
-      eBroadcastBitReadThreadShouldExit =
-          (1u << 3), ///< Sent by clients that need to cancel the read thread.
-      eBroadcastBitPacketAvailable =
-          (1u << 4), ///< Sent when data received makes a complete packet.
-      eBroadcastBitNoMorePendingInput = (1u << 5), ///< Sent by the read thread
-                                                   ///to indicate all pending
-                                                   ///input has been processed.
-      kLoUserBroadcastBit =
-          (1u << 16), ///< Subclasses can used bits 31:16 for any needed events.
-      kHiUserBroadcastBit = (1u << 31),
-      eAllEventBits = 0xffffffff};
-
-  typedef void (*ReadThreadBytesReceived)(void *baton, const void *src,
-                                          size_t src_len);
-
-  /// Construct the Communication object with the specified name for the
-  /// Broadcaster that this object inherits from.
-  ///
-  /// \param[in] broadcaster_name
-  ///     The name of the broadcaster object.  This name should be as
-  ///     complete as possible to uniquely identify this object. The
-  ///     broadcaster name can be updated after the connect function
-  ///     is called.
-  Communication(const char *broadcaster_name);
+  /// Construct the Communication object.
+  Communication();
 
   /// Destructor.
   ///
   /// The destructor is virtual since this class gets subclassed.
-  ~Communication() override;
+  virtual ~Communication();
 
-  void Clear();
+  virtual void Clear();
 
   /// Connect using the current connection by passing \a url to its connect
   /// function. string.
@@ -148,7 +73,7 @@ class Communication : public Broadcaster {
   ///
   /// \see Status& Communication::GetError ();
   /// \see bool Connection::Disconnect ();
-  lldb::ConnectionStatus Disconnect(Status *error_ptr = nullptr);
+  virtual lldb::ConnectionStatus Disconnect(Status *error_ptr = nullptr);
 
   /// Check if the connection is valid.
   ///
@@ -166,13 +91,6 @@ class Communication : public Broadcaster {
   /// If no read thread is running, this function call the connection's
   /// Connection::Read(...) function to get any available.
   ///
-  /// If a read thread has been started, this function will check for any
-  /// cached bytes that have already been read and return any currently
-  /// available bytes. If no bytes are cached, it will wait for the bytes to
-  /// become available by listening for the \a eBroadcastBitReadThreadGotBytes
-  /// event. If this function consumes all of the bytes in the cache, it will
-  /// reset the \a eBroadcastBitReadThreadGotBytes event bit.
-  ///
   /// \param[in] dst
   ///     A destination buffer that must be at least \a dst_len bytes
   ///     long.
@@ -188,8 +106,9 @@ class Communication : public Broadcaster {
   ///     The number of bytes actually read.
   ///
   /// \see size_t Connection::Read (void *, size_t);
-  size_t Read(void *dst, size_t dst_len, const Timeout<std::micro> &timeout,
-              lldb::ConnectionStatus &status, Status *error_ptr);
+  virtual size_t Read(void *dst, size_t dst_len,
+                      const Timeout<std::micro> &timeout,
+                      lldb::ConnectionStatus &status, Status *error_ptr);
 
   /// The actual write function that attempts to write to the communications
   /// protocol.
@@ -237,69 +156,7 @@ class Communication : public Broadcaster {
   ///
   /// \see
   ///     class Connection
-  void SetConnection(std::unique_ptr<Connection> connection);
-
-  /// Starts a read thread whose sole purpose it to read bytes from the
-  /// current connection. This function will call connection's read function:
-  ///
-  /// size_t Connection::Read (void *, size_t);
-  ///
-  /// When bytes are read and cached, this function will call:
-  ///
-  /// Communication::AppendBytesToCache (const uint8_t * bytes, size_t len,
-  /// bool
-  /// broadcast);
-  ///
-  /// Subclasses should override this function if they wish to override the
-  /// default action of caching the bytes and broadcasting a \b
-  /// eBroadcastBitReadThreadGotBytes event.
-  ///
-  /// \return
-  ///     \b True if the read thread was successfully started, \b
-  ///     false otherwise.
-  ///
-  /// \see size_t Connection::Read (void *, size_t);
-  /// \see void Communication::AppendBytesToCache (const uint8_t * bytes,
-  ///                                              size_t len, bool broadcast);
-  virtual bool StartReadThread(Status *error_ptr = nullptr);
-
-  /// Stops the read thread by cancelling it.
-  ///
-  /// \return
-  ///     \b True if the read thread was successfully canceled, \b
-  ///     false otherwise.
-  virtual bool StopReadThread(Status *error_ptr = nullptr);
-
-  virtual bool JoinReadThread(Status *error_ptr = nullptr);
-  /// Checks if there is a currently running read thread.
-  ///
-  /// \return
-  ///     \b True if the read thread is running, \b false otherwise.
-  bool ReadThreadIsRunning();
-
-  /// The read thread function. This function will call the "DoRead"
-  /// function continuously and wait for data to become available. When data
-  /// is received it will append the available data to the internal cache and
-  /// broadcast a \b eBroadcastBitReadThreadGotBytes event.
-  ///
-  /// \param[in] comm_ptr
-  ///     A pointer to an instance of this class.
-  ///
-  /// \return
-  ///     \b NULL.
-  ///
-  /// \see void Communication::ReadThreadGotBytes (const uint8_t *, size_t);
-  lldb::thread_result_t ReadThread();
-
-  void SetReadThreadBytesReceivedCallback(ReadThreadBytesReceived callback,
-                                          void *callback_baton);
-
-  /// Wait for the read thread to process all outstanding data.
-  ///
-  /// After this function returns, the read thread has processed all data that
-  /// has been waiting in the Connection queue.
-  ///
-  void SynchronizeWithReadThread();
+  virtual void SetConnection(std::unique_ptr<Connection> connection);
 
   static std::string ConnectionStatusAsString(lldb::ConnectionStatus status);
 
@@ -307,76 +164,17 @@ class Communication : public Broadcaster {
 
   void SetCloseOnEOF(bool b) { m_close_on_eof = b; }
 
-  static ConstString &GetStaticBroadcasterClass();
-
-  ConstString &GetBroadcasterClass() const override {
-    return GetStaticBroadcasterClass();
-  }
-
 protected:
   lldb::ConnectionSP m_connection_sp; ///< The connection that is current in use
                                       ///by this communications class.
-  HostThread m_read_thread; ///< The read thread handle in case we need to
-                            ///cancel the thread.
-  std::atomic<bool> m_read_thread_enabled;
-  std::atomic<bool> m_read_thread_did_exit;
-  std::string
-      m_bytes; ///< A buffer to cache bytes read in the ReadThread function.
-  std::recursive_mutex m_bytes_mutex; ///< A mutex to protect multi-threaded
-                                      ///access to the cached bytes.
-  lldb::ConnectionStatus m_pass_status; ///< Connection status passthrough
-                                        ///from read thread.
-  Status m_pass_error; ///< Error passthrough from read thread.
   std::mutex
       m_write_mutex; ///< Don't let multiple threads write at the same time...
-  std::mutex m_synchronize_mutex;
-  ReadThreadBytesReceived m_callback;
-  void *m_callback_baton;
   bool m_close_on_eof;
 
   size_t ReadFromConnection(void *dst, size_t dst_len,
                             const Timeout<std::micro> &timeout,
                             lldb::ConnectionStatus &status, Status *error_ptr);
 
-  /// Append new bytes that get read from the read thread into the internal
-  /// object byte cache. This will cause a \b eBroadcastBitReadThreadGotBytes
-  /// event to be broadcast if \a broadcast is true.
-  ///
-  /// Subclasses can override this function in order to inspect the received
-  /// data and check if a packet is available.
-  ///
-  /// Subclasses can also still call this function from the overridden method
-  /// to allow the caching to correctly happen and suppress the broadcasting
-  /// of the \a eBroadcastBitReadThreadGotBytes event by setting \a broadcast
-  /// to false.
-  ///
-  /// \param[in] src
-  ///     A source buffer that must be at least \a src_len bytes
-  ///     long.
-  ///
-  /// \param[in] src_len
-  ///     The number of bytes to append to the cache.
-  virtual void AppendBytesToCache(const uint8_t *src, size_t src_len,
-                                  bool broadcast,
-                                  lldb::ConnectionStatus status);
-
-  /// Get any available bytes from our data cache. If this call empties the
-  /// data cache, the \b eBroadcastBitReadThreadGotBytes event will be reset
-  /// to signify no more bytes are available.
-  ///
-  /// \param[in] dst
-  ///     A destination buffer that must be at least \a dst_len bytes
-  ///     long.
-  ///
-  /// \param[in] dst_len
-  ///     The number of bytes to attempt to read from the cache,
-  ///     and also the max number of bytes that can be placed into
-  ///     \a dst.
-  ///
-  /// \return
-  ///     The number of bytes extracted from the data cache.
-  size_t GetCachedBytes(void *dst, size_t dst_len);
-
 private:
   Communication(const Communication &) = delete;
   const Communication &operator=(const Communication &) = delete;

diff  --git a/lldb/include/lldb/Core/ThreadedCommunication.h b/lldb/include/lldb/Core/ThreadedCommunication.h
new file mode 100644
index 0000000000000..b7412c796f107
--- /dev/null
+++ b/lldb/include/lldb/Core/ThreadedCommunication.h
@@ -0,0 +1,288 @@
+//===-- ThreadedCommunication.h ---------------------------------*- C++ -*-===//
+//
+// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
+// See https://llvm.org/LICENSE.txt for license information.
+// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
+//
+//===----------------------------------------------------------------------===//
+
+#ifndef LLDB_CORE_THREADEDCOMMUNICATION_H
+#define LLDB_CORE_THREADEDCOMMUNICATION_H
+
+#include "lldb/Core/Communication.h"
+#include "lldb/Host/HostThread.h"
+#include "lldb/Utility/Broadcaster.h"
+
+#include <atomic>
+#include <mutex>
+#include <string>
+
+#include <cstddef>
+#include <cstdint>
+
+namespace lldb_private {
+
+/// \class ThreadedCommunication ThreadedCommunication.h
+/// "lldb/Core/ThreadedCommunication.h" Variation of Communication that
+/// supports threaded reads.
+///
+/// ThreadedCommunication enhances the base Communication class with support
+/// for multi-threaded mode.  In this mode, a read thread is spawned that
+/// continually reads data and caches any received bytes. To start the read
+/// thread clients call:
+///
+///     bool ThreadedCommunication::StartReadThread (Status *);
+///
+/// If true is returned a read thread has been spawned that will continually
+/// execute a call to the pure virtual DoRead function:
+///
+///     size_t Communication::ReadFromConnection (void *, size_t, uint32_t);
+///
+/// When bytes are received the data gets cached in \a m_bytes and this class
+/// will broadcast a \b eBroadcastBitReadThreadGotBytes event. Clients that
+/// want packet based communication should override AppendBytesToCache. The
+/// subclasses can choose to call the built in AppendBytesToCache with the \a
+/// broadcast parameter set to false. This will cause the \b
+/// eBroadcastBitReadThreadGotBytes event not get broadcast, and then the
+/// subclass can post a \b eBroadcastBitPacketAvailable event when a full
+/// packet of data has been received.
+///
+/// If the connection is disconnected a \b eBroadcastBitDisconnected event
+/// gets broadcast. If the read thread exits a \b
+/// eBroadcastBitReadThreadDidExit event will be broadcast. Clients can also
+/// post a \b eBroadcastBitReadThreadShouldExit event to this object which
+/// will cause the read thread to exit.
+///
+/// ThreadedCommunication inherits from Broadcaster which means it can be used
+/// in conjunction with Listener to wait for multiple broadcaster objects and
+/// multiple events from each of those objects. ThreadedCommunication defines a
+/// set of pre-defined event bits (see enumerations definitions that start with
+/// "eBroadcastBit" below).
+class ThreadedCommunication : public Communication, public Broadcaster {
+  using Communication::Communication;
+
+public:
+  FLAGS_ANONYMOUS_ENUM(){
+      eBroadcastBitDisconnected =
+          (1u << 0), ///< Sent when the communications connection is lost.
+      eBroadcastBitReadThreadGotBytes =
+          (1u << 1), ///< Sent by the read thread when bytes become available.
+      eBroadcastBitReadThreadDidExit =
+          (1u
+           << 2), ///< Sent by the read thread when it exits to inform clients.
+      eBroadcastBitReadThreadShouldExit =
+          (1u << 3), ///< Sent by clients that need to cancel the read thread.
+      eBroadcastBitPacketAvailable =
+          (1u << 4), ///< Sent when data received makes a complete packet.
+      eBroadcastBitNoMorePendingInput = (1u << 5), ///< Sent by the read thread
+                                                   /// to indicate all pending
+                                                   /// input has been processed.
+  };
+
+  typedef void (*ReadThreadBytesReceived)(void *baton, const void *src,
+                                          size_t src_len);
+
+  /// Construct the ThreadedCommunication object with the specified name for the
+  /// Broadcaster that this object inherits from.
+  ///
+  /// \param[in] broadcaster_name
+  ///     The name of the broadcaster object.  This name should be as
+  ///     complete as possible to uniquely identify this object. The
+  ///     broadcaster name can be updated after the connect function
+  ///     is called.
+  ThreadedCommunication(const char *broadcaster_name);
+
+  /// Destructor.
+  ///
+  /// The destructor is virtual since this class gets subclassed.
+  ~ThreadedCommunication() override;
+
+  void Clear() override;
+
+  /// Disconnect the communications connection if one is currently connected.
+  ///
+  /// \return
+  ///     \b True if the disconnect succeeded, \b false otherwise. The
+  ///     internal error object should be filled in with an
+  ///     appropriate value based on the result of this function.
+  ///
+  /// \see Status& Communication::GetError ();
+  /// \see bool Connection::Disconnect ();
+  lldb::ConnectionStatus Disconnect(Status *error_ptr = nullptr) override;
+
+  /// Read bytes from the current connection.
+  ///
+  /// If no read thread is running, this function call the connection's
+  /// Connection::Read(...) function to get any available.
+  ///
+  /// If a read thread has been started, this function will check for any
+  /// cached bytes that have already been read and return any currently
+  /// available bytes. If no bytes are cached, it will wait for the bytes to
+  /// become available by listening for the \a eBroadcastBitReadThreadGotBytes
+  /// event. If this function consumes all of the bytes in the cache, it will
+  /// reset the \a eBroadcastBitReadThreadGotBytes event bit.
+  ///
+  /// \param[in] dst
+  ///     A destination buffer that must be at least \a dst_len bytes
+  ///     long.
+  ///
+  /// \param[in] dst_len
+  ///     The number of bytes to attempt to read, and also the max
+  ///     number of bytes that can be placed into \a dst.
+  ///
+  /// \param[in] timeout
+  ///     A timeout value or llvm::None for no timeout.
+  ///
+  /// \return
+  ///     The number of bytes actually read.
+  ///
+  /// \see size_t Connection::Read (void *, size_t);
+  size_t Read(void *dst, size_t dst_len, const Timeout<std::micro> &timeout,
+              lldb::ConnectionStatus &status, Status *error_ptr) override;
+
+  /// Sets the connection that it to be used by this class.
+  ///
+  /// By making a communication class that uses 
diff erent connections it
+  /// allows a single communication interface to negotiate and change its
+  /// connection without any interruption to the client. It also allows the
+  /// Communication class to be subclassed for packet based communication.
+  ///
+  /// \param[in] connection
+  ///     A connection that this class will own and destroy.
+  ///
+  /// \see
+  ///     class Connection
+  void SetConnection(std::unique_ptr<Connection> connection) override;
+
+  /// Starts a read thread whose sole purpose it to read bytes from the
+  /// current connection. This function will call connection's read function:
+  ///
+  /// size_t Connection::Read (void *, size_t);
+  ///
+  /// When bytes are read and cached, this function will call:
+  ///
+  /// Communication::AppendBytesToCache (const uint8_t * bytes, size_t len,
+  /// bool
+  /// broadcast);
+  ///
+  /// Subclasses should override this function if they wish to override the
+  /// default action of caching the bytes and broadcasting a \b
+  /// eBroadcastBitReadThreadGotBytes event.
+  ///
+  /// \return
+  ///     \b True if the read thread was successfully started, \b
+  ///     false otherwise.
+  ///
+  /// \see size_t Connection::Read (void *, size_t);
+  /// \see void Communication::AppendBytesToCache (const uint8_t * bytes,
+  ///                                              size_t len, bool broadcast);
+  virtual bool StartReadThread(Status *error_ptr = nullptr);
+
+  /// Stops the read thread by cancelling it.
+  ///
+  /// \return
+  ///     \b True if the read thread was successfully canceled, \b
+  ///     false otherwise.
+  virtual bool StopReadThread(Status *error_ptr = nullptr);
+
+  virtual bool JoinReadThread(Status *error_ptr = nullptr);
+  /// Checks if there is a currently running read thread.
+  ///
+  /// \return
+  ///     \b True if the read thread is running, \b false otherwise.
+  bool ReadThreadIsRunning();
+
+  /// The read thread function. This function will call the "DoRead"
+  /// function continuously and wait for data to become available. When data
+  /// is received it will append the available data to the internal cache and
+  /// broadcast a \b eBroadcastBitReadThreadGotBytes event.
+  ///
+  /// \param[in] comm_ptr
+  ///     A pointer to an instance of this class.
+  ///
+  /// \return
+  ///     \b NULL.
+  ///
+  /// \see void Communication::ReadThreadGotBytes (const uint8_t *, size_t);
+  lldb::thread_result_t ReadThread();
+
+  void SetReadThreadBytesReceivedCallback(ReadThreadBytesReceived callback,
+                                          void *callback_baton);
+
+  /// Wait for the read thread to process all outstanding data.
+  ///
+  /// After this function returns, the read thread has processed all data that
+  /// has been waiting in the Connection queue.
+  ///
+  void SynchronizeWithReadThread();
+
+  static ConstString &GetStaticBroadcasterClass();
+
+  ConstString &GetBroadcasterClass() const override {
+    return GetStaticBroadcasterClass();
+  }
+
+protected:
+  HostThread m_read_thread; ///< The read thread handle in case we need to
+                            /// cancel the thread.
+  std::atomic<bool> m_read_thread_enabled;
+  std::atomic<bool> m_read_thread_did_exit;
+  std::string
+      m_bytes; ///< A buffer to cache bytes read in the ReadThread function.
+  std::recursive_mutex m_bytes_mutex;   ///< A mutex to protect multi-threaded
+                                        /// access to the cached bytes.
+  lldb::ConnectionStatus m_pass_status; ///< Connection status passthrough
+                                        /// from read thread.
+  Status m_pass_error;                  ///< Error passthrough from read thread.
+  std::mutex m_synchronize_mutex;
+  ReadThreadBytesReceived m_callback;
+  void *m_callback_baton;
+
+  /// Append new bytes that get read from the read thread into the internal
+  /// object byte cache. This will cause a \b eBroadcastBitReadThreadGotBytes
+  /// event to be broadcast if \a broadcast is true.
+  ///
+  /// Subclasses can override this function in order to inspect the received
+  /// data and check if a packet is available.
+  ///
+  /// Subclasses can also still call this function from the overridden method
+  /// to allow the caching to correctly happen and suppress the broadcasting
+  /// of the \a eBroadcastBitReadThreadGotBytes event by setting \a broadcast
+  /// to false.
+  ///
+  /// \param[in] src
+  ///     A source buffer that must be at least \a src_len bytes
+  ///     long.
+  ///
+  /// \param[in] src_len
+  ///     The number of bytes to append to the cache.
+  virtual void AppendBytesToCache(const uint8_t *src, size_t src_len,
+                                  bool broadcast,
+                                  lldb::ConnectionStatus status);
+
+  /// Get any available bytes from our data cache. If this call empties the
+  /// data cache, the \b eBroadcastBitReadThreadGotBytes event will be reset
+  /// to signify no more bytes are available.
+  ///
+  /// \param[in] dst
+  ///     A destination buffer that must be at least \a dst_len bytes
+  ///     long.
+  ///
+  /// \param[in] dst_len
+  ///     The number of bytes to attempt to read from the cache,
+  ///     and also the max number of bytes that can be placed into
+  ///     \a dst.
+  ///
+  /// \return
+  ///     The number of bytes extracted from the data cache.
+  size_t GetCachedBytes(void *dst, size_t dst_len);
+
+private:
+  ThreadedCommunication(const ThreadedCommunication &) = delete;
+  const ThreadedCommunication &
+  operator=(const ThreadedCommunication &) = delete;
+};
+
+} // namespace lldb_private
+
+#endif // LLDB_CORE_THREADEDCOMMUNICATION_H

diff  --git a/lldb/include/lldb/Interpreter/ScriptInterpreter.h b/lldb/include/lldb/Interpreter/ScriptInterpreter.h
index 83f784bde712e..cb3cafaf2ed51 100644
--- a/lldb/include/lldb/Interpreter/ScriptInterpreter.h
+++ b/lldb/include/lldb/Interpreter/ScriptInterpreter.h
@@ -13,10 +13,10 @@
 #include "lldb/API/SBError.h"
 #include "lldb/API/SBMemoryRegionInfo.h"
 #include "lldb/Breakpoint/BreakpointOptions.h"
-#include "lldb/Core/Communication.h"
 #include "lldb/Core/PluginInterface.h"
 #include "lldb/Core/SearchFilter.h"
 #include "lldb/Core/StreamFile.h"
+#include "lldb/Core/ThreadedCommunication.h"
 #include "lldb/Host/PseudoTerminal.h"
 #include "lldb/Interpreter/ScriptedProcessInterface.h"
 #include "lldb/Utility/Broadcaster.h"
@@ -119,7 +119,7 @@ class ScriptInterpreterIORedirect {
   lldb::FileSP m_input_file_sp;
   lldb::StreamFileSP m_output_file_sp;
   lldb::StreamFileSP m_error_file_sp;
-  Communication m_communication;
+  ThreadedCommunication m_communication;
   bool m_disconnect;
 };
 

diff  --git a/lldb/include/lldb/Target/Process.h b/lldb/include/lldb/Target/Process.h
index 05b0eb6237c71..5bca9f076cb58 100644
--- a/lldb/include/lldb/Target/Process.h
+++ b/lldb/include/lldb/Target/Process.h
@@ -22,10 +22,10 @@
 #include <vector>
 
 #include "lldb/Breakpoint/BreakpointSiteList.h"
-#include "lldb/Core/Communication.h"
 #include "lldb/Core/LoadedModuleInfoList.h"
 #include "lldb/Core/PluginInterface.h"
 #include "lldb/Core/ThreadSafeValue.h"
+#include "lldb/Core/ThreadedCommunication.h"
 #include "lldb/Core/UserSettingsController.h"
 #include "lldb/Host/HostThread.h"
 #include "lldb/Host/ProcessLaunchInfo.h"
@@ -2883,7 +2883,7 @@ void PruneThreadPlans();
       m_unix_signals_sp; /// This is the current signal set for this process.
   lldb::ABISP m_abi_sp;
   lldb::IOHandlerSP m_process_input_reader;
-  Communication m_stdio_communication;
+  ThreadedCommunication m_stdio_communication;
   std::recursive_mutex m_stdio_communication_mutex;
   bool m_stdin_forward; /// Remember if stdin must be forwarded to remote debug
                         /// server

diff  --git a/lldb/include/lldb/lldb-forward.h b/lldb/include/lldb/lldb-forward.h
index 9ab7f3c1718ed..2fe6ac58be05c 100644
--- a/lldb/include/lldb/lldb-forward.h
+++ b/lldb/include/lldb/lldb-forward.h
@@ -236,6 +236,7 @@ class ThreadPlanStepThrough;
 class ThreadPlanTracer;
 class ThreadSpec;
 class ThreadPostMortemTrace;
+class ThreadedCommunication;
 class Trace;
 class TraceCursor;
 class TraceExporter;

diff  --git a/lldb/source/API/SBCommunication.cpp b/lldb/source/API/SBCommunication.cpp
index 0a1dad1e2e8fb..b9c8b498a38e3 100644
--- a/lldb/source/API/SBCommunication.cpp
+++ b/lldb/source/API/SBCommunication.cpp
@@ -8,7 +8,7 @@
 
 #include "lldb/API/SBCommunication.h"
 #include "lldb/API/SBBroadcaster.h"
-#include "lldb/Core/Communication.h"
+#include "lldb/Core/ThreadedCommunication.h"
 #include "lldb/Host/ConnectionFileDescriptor.h"
 #include "lldb/Host/Host.h"
 #include "lldb/Utility/Instrumentation.h"
@@ -19,7 +19,8 @@ using namespace lldb_private;
 SBCommunication::SBCommunication() { LLDB_INSTRUMENT_VA(this); }
 
 SBCommunication::SBCommunication(const char *broadcaster_name)
-    : m_opaque(new Communication(broadcaster_name)), m_opaque_owned(true) {
+    : m_opaque(new ThreadedCommunication(broadcaster_name)),
+      m_opaque_owned(true) {
   LLDB_INSTRUMENT_VA(this, broadcaster_name);
 }
 
@@ -169,5 +170,5 @@ SBBroadcaster SBCommunication::GetBroadcaster() {
 const char *SBCommunication::GetBroadcasterClass() {
   LLDB_INSTRUMENT();
 
-  return Communication::GetStaticBroadcasterClass().AsCString();
+  return ThreadedCommunication::GetStaticBroadcasterClass().AsCString();
 }

diff  --git a/lldb/source/Core/CMakeLists.txt b/lldb/source/Core/CMakeLists.txt
index d907177586e78..c578656463f8e 100644
--- a/lldb/source/Core/CMakeLists.txt
+++ b/lldb/source/Core/CMakeLists.txt
@@ -54,6 +54,7 @@ add_lldb_library(lldbCore
   SourceManager.cpp
   StreamAsynchronousIO.cpp
   StreamFile.cpp
+  ThreadedCommunication.cpp
   UserSettingsController.cpp
   Value.cpp
   ValueObject.cpp

diff  --git a/lldb/source/Core/Communication.cpp b/lldb/source/Core/Communication.cpp
index 3606d92a9d4e0..5d890632ccc6a 100644
--- a/lldb/source/Core/Communication.cpp
+++ b/lldb/source/Core/Communication.cpp
@@ -8,22 +8,14 @@
 
 #include "lldb/Core/Communication.h"
 
-#include "lldb/Host/HostThread.h"
-#include "lldb/Host/ThreadLauncher.h"
 #include "lldb/Utility/Connection.h"
-#include "lldb/Utility/ConstString.h"
-#include "lldb/Utility/Event.h"
 #include "lldb/Utility/LLDBLog.h"
-#include "lldb/Utility/Listener.h"
 #include "lldb/Utility/Log.h"
 #include "lldb/Utility/Status.h"
 
-#include "llvm/ADT/None.h"
-#include "llvm/ADT/Optional.h"
 #include "llvm/Support/Compiler.h"
 
 #include <algorithm>
-#include <chrono>
 #include <cstring>
 #include <memory>
 
@@ -34,42 +26,15 @@
 using namespace lldb;
 using namespace lldb_private;
 
-ConstString &Communication::GetStaticBroadcasterClass() {
-  static ConstString class_name("lldb.communication");
-  return class_name;
-}
-
-Communication::Communication(const char *name)
-    : Broadcaster(nullptr, name), m_connection_sp(),
-      m_read_thread_enabled(false), m_read_thread_did_exit(false), m_bytes(),
-      m_bytes_mutex(), m_write_mutex(), m_synchronize_mutex(),
-      m_callback(nullptr), m_callback_baton(nullptr), m_close_on_eof(true)
-
-{
-
-  LLDB_LOG(GetLog(LLDBLog::Object | LLDBLog::Communication),
-           "{0} Communication::Communication (name = {1})", this, name);
-
-  SetEventName(eBroadcastBitDisconnected, "disconnected");
-  SetEventName(eBroadcastBitReadThreadGotBytes, "got bytes");
-  SetEventName(eBroadcastBitReadThreadDidExit, "read thread did exit");
-  SetEventName(eBroadcastBitReadThreadShouldExit, "read thread should exit");
-  SetEventName(eBroadcastBitPacketAvailable, "packet available");
-  SetEventName(eBroadcastBitNoMorePendingInput, "no more pending input");
-
-  CheckInWithManager();
+Communication::Communication()
+    : m_connection_sp(), m_write_mutex(), m_close_on_eof(true) {
 }
 
 Communication::~Communication() {
-  LLDB_LOG(GetLog(LLDBLog::Object | LLDBLog::Communication),
-           "{0} Communication::~Communication (name = {1})", this,
-           GetBroadcasterName().AsCString());
   Clear();
 }
 
 void Communication::Clear() {
-  SetReadThreadBytesReceivedCallback(nullptr, nullptr);
-  StopReadThread(nullptr);
   Disconnect(nullptr);
 }
 
@@ -91,8 +56,6 @@ ConnectionStatus Communication::Disconnect(Status *error_ptr) {
   LLDB_LOG(GetLog(LLDBLog::Communication), "{0} Communication::Disconnect ()",
            this);
 
-  assert((!m_read_thread_enabled || m_read_thread_did_exit) &&
-         "Disconnecting while the read thread is running is racy!");
   lldb::ConnectionSP connection_sp(m_connection_sp);
   if (connection_sp) {
     ConnectionStatus status = connection_sp->Disconnect(error_ptr);
@@ -129,58 +92,6 @@ size_t Communication::Read(void *dst, size_t dst_len,
       "this = {0}, dst = {1}, dst_len = {2}, timeout = {3}, connection = {4}",
       this, dst, dst_len, timeout, m_connection_sp.get());
 
-  if (m_read_thread_enabled) {
-    // We have a dedicated read thread that is getting data for us
-    size_t cached_bytes = GetCachedBytes(dst, dst_len);
-    if (cached_bytes > 0) {
-      status = eConnectionStatusSuccess;
-      return cached_bytes;
-    }
-    if (timeout && timeout->count() == 0) {
-      if (error_ptr)
-        error_ptr->SetErrorString("Timed out.");
-      status = eConnectionStatusTimedOut;
-      return 0;
-    }
-
-    if (!m_connection_sp) {
-      if (error_ptr)
-        error_ptr->SetErrorString("Invalid connection.");
-      status = eConnectionStatusNoConnection;
-      return 0;
-    }
-
-    ListenerSP listener_sp(Listener::MakeListener("Communication::Read"));
-    listener_sp->StartListeningForEvents(
-        this, eBroadcastBitReadThreadGotBytes | eBroadcastBitReadThreadDidExit);
-    EventSP event_sp;
-    while (listener_sp->GetEvent(event_sp, timeout)) {
-      const uint32_t event_type = event_sp->GetType();
-      if (event_type & eBroadcastBitReadThreadGotBytes) {
-        return GetCachedBytes(dst, dst_len);
-      }
-
-      if (event_type & eBroadcastBitReadThreadDidExit) {
-        // If the thread exited of its own accord, it either means it
-        // hit an end-of-file condition or an error.
-        status = m_pass_status;
-        if (error_ptr)
-          *error_ptr = std::move(m_pass_error);
-
-        if (GetCloseOnEOF())
-          Disconnect(nullptr);
-        return 0;
-      }
-    }
-
-    if (error_ptr)
-      error_ptr->SetErrorString("Timed out.");
-    status = eConnectionStatusTimedOut;
-    return 0;
-  }
-
-  // We aren't using a read thread, just read the data synchronously in this
-  // thread.
   return ReadFromConnection(dst, dst_len, timeout, status, error_ptr);
 }
 
@@ -213,104 +124,6 @@ size_t Communication::WriteAll(const void *src, size_t src_len,
   return total_written;
 }
 
-bool Communication::StartReadThread(Status *error_ptr) {
-  if (error_ptr)
-    error_ptr->Clear();
-
-  if (m_read_thread.IsJoinable())
-    return true;
-
-  LLDB_LOG(GetLog(LLDBLog::Communication),
-           "{0} Communication::StartReadThread ()", this);
-
-  const std::string thread_name =
-      llvm::formatv("<lldb.comm.{0}>", GetBroadcasterName());
-
-  m_read_thread_enabled = true;
-  m_read_thread_did_exit = false;
-  auto maybe_thread = ThreadLauncher::LaunchThread(
-      thread_name, [this] { return ReadThread(); });
-  if (maybe_thread) {
-    m_read_thread = *maybe_thread;
-  } else {
-    if (error_ptr)
-      *error_ptr = Status(maybe_thread.takeError());
-    else {
-      LLDB_LOG(GetLog(LLDBLog::Host), "failed to launch host thread: {}",
-               llvm::toString(maybe_thread.takeError()));
-    }
-  }
-
-  if (!m_read_thread.IsJoinable())
-    m_read_thread_enabled = false;
-
-  return m_read_thread_enabled;
-}
-
-bool Communication::StopReadThread(Status *error_ptr) {
-  if (!m_read_thread.IsJoinable())
-    return true;
-
-  LLDB_LOG(GetLog(LLDBLog::Communication),
-           "{0} Communication::StopReadThread ()", this);
-
-  m_read_thread_enabled = false;
-
-  BroadcastEvent(eBroadcastBitReadThreadShouldExit, nullptr);
-
-  // error = m_read_thread.Cancel();
-
-  Status error = m_read_thread.Join(nullptr);
-  return error.Success();
-}
-
-bool Communication::JoinReadThread(Status *error_ptr) {
-  if (!m_read_thread.IsJoinable())
-    return true;
-
-  Status error = m_read_thread.Join(nullptr);
-  return error.Success();
-}
-
-size_t Communication::GetCachedBytes(void *dst, size_t dst_len) {
-  std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex);
-  if (!m_bytes.empty()) {
-    // If DST is nullptr and we have a thread, then return the number of bytes
-    // that are available so the caller can call again
-    if (dst == nullptr)
-      return m_bytes.size();
-
-    const size_t len = std::min<size_t>(dst_len, m_bytes.size());
-
-    ::memcpy(dst, m_bytes.c_str(), len);
-    m_bytes.erase(m_bytes.begin(), m_bytes.begin() + len);
-
-    return len;
-  }
-  return 0;
-}
-
-void Communication::AppendBytesToCache(const uint8_t *bytes, size_t len,
-                                       bool broadcast,
-                                       ConnectionStatus status) {
-  LLDB_LOG(GetLog(LLDBLog::Communication),
-           "{0} Communication::AppendBytesToCache (src = {1}, src_len = {2}, "
-           "broadcast = {3})",
-           this, bytes, (uint64_t)len, broadcast);
-  if ((bytes == nullptr || len == 0) &&
-      (status != lldb::eConnectionStatusEndOfFile))
-    return;
-  if (m_callback) {
-    // If the user registered a callback, then call it and do not broadcast
-    m_callback(m_callback_baton, bytes, len);
-  } else if (bytes != nullptr && len > 0) {
-    std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex);
-    m_bytes.append((const char *)bytes, len);
-    if (broadcast)
-      BroadcastEventIfUnique(eBroadcastBitReadThreadGotBytes);
-  }
-}
-
 size_t Communication::ReadFromConnection(void *dst, size_t dst_len,
                                          const Timeout<std::micro> &timeout,
                                          ConnectionStatus &status,
@@ -325,115 +138,8 @@ size_t Communication::ReadFromConnection(void *dst, size_t dst_len,
   return 0;
 }
 
-bool Communication::ReadThreadIsRunning() { return m_read_thread_enabled; }
-
-lldb::thread_result_t Communication::ReadThread() {
-  Log *log = GetLog(LLDBLog::Communication);
-
-  LLDB_LOG(log, "Communication({0}) thread starting...", this);
-
-  uint8_t buf[1024];
-
-  Status error;
-  ConnectionStatus status = eConnectionStatusSuccess;
-  bool done = false;
-  bool disconnect = false;
-  while (!done && m_read_thread_enabled) {
-    size_t bytes_read = ReadFromConnection(
-        buf, sizeof(buf), std::chrono::seconds(5), status, &error);
-    if (bytes_read > 0 || status == eConnectionStatusEndOfFile)
-      AppendBytesToCache(buf, bytes_read, true, status);
-
-    switch (status) {
-    case eConnectionStatusSuccess:
-      break;
-
-    case eConnectionStatusEndOfFile:
-      done = true;
-      disconnect = GetCloseOnEOF();
-      break;
-    case eConnectionStatusError: // Check GetError() for details
-      if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) {
-        // EIO on a pipe is usually caused by remote shutdown
-        disconnect = GetCloseOnEOF();
-        done = true;
-      }
-      if (error.Fail())
-        LLDB_LOG(log, "error: {0}, status = {1}", error,
-                 Communication::ConnectionStatusAsString(status));
-      break;
-    case eConnectionStatusInterrupted: // Synchronization signal from
-                                       // SynchronizeWithReadThread()
-      // The connection returns eConnectionStatusInterrupted only when there is
-      // no input pending to be read, so we can signal that.
-      BroadcastEvent(eBroadcastBitNoMorePendingInput);
-      break;
-    case eConnectionStatusNoConnection:   // No connection
-    case eConnectionStatusLostConnection: // Lost connection while connected to
-                                          // a valid connection
-      done = true;
-      [[fallthrough]];
-    case eConnectionStatusTimedOut: // Request timed out
-      if (error.Fail())
-        LLDB_LOG(log, "error: {0}, status = {1}", error,
-                 Communication::ConnectionStatusAsString(status));
-      break;
-    }
-  }
-  m_pass_status = status;
-  m_pass_error = std::move(error);
-  LLDB_LOG(log, "Communication({0}) thread exiting...", this);
-
-  // Handle threads wishing to synchronize with us.
-  {
-    // Prevent new ones from showing up.
-    m_read_thread_did_exit = true;
-
-    // Unblock any existing thread waiting for the synchronization signal.
-    BroadcastEvent(eBroadcastBitNoMorePendingInput);
-
-    // Wait for the thread to finish...
-    std::lock_guard<std::mutex> guard(m_synchronize_mutex);
-    // ... and disconnect.
-    if (disconnect)
-      Disconnect();
-  }
-
-  // Let clients know that this thread is exiting
-  BroadcastEvent(eBroadcastBitReadThreadDidExit);
-  return {};
-}
-
-void Communication::SetReadThreadBytesReceivedCallback(
-    ReadThreadBytesReceived callback, void *callback_baton) {
-  m_callback = callback;
-  m_callback_baton = callback_baton;
-}
-
-void Communication::SynchronizeWithReadThread() {
-  // Only one thread can do the synchronization dance at a time.
-  std::lock_guard<std::mutex> guard(m_synchronize_mutex);
-
-  // First start listening for the synchronization event.
-  ListenerSP listener_sp(
-      Listener::MakeListener("Communication::SyncronizeWithReadThread"));
-  listener_sp->StartListeningForEvents(this, eBroadcastBitNoMorePendingInput);
-
-  // If the thread is not running, there is no point in synchronizing.
-  if (!m_read_thread_enabled || m_read_thread_did_exit)
-    return;
-
-  // Notify the read thread.
-  m_connection_sp->InterruptRead();
-
-  // Wait for the synchronization event.
-  EventSP event_sp;
-  listener_sp->GetEvent(event_sp, llvm::None);
-}
-
 void Communication::SetConnection(std::unique_ptr<Connection> connection) {
   Disconnect(nullptr);
-  StopReadThread(nullptr);
   m_connection_sp = std::move(connection);
 }
 

diff  --git a/lldb/source/Core/ThreadedCommunication.cpp b/lldb/source/Core/ThreadedCommunication.cpp
new file mode 100644
index 0000000000000..ab89b474769a5
--- /dev/null
+++ b/lldb/source/Core/ThreadedCommunication.cpp
@@ -0,0 +1,353 @@
+//===-- ThreadedCommunication.cpp -----------------------------------------===//
+//
+// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
+// See https://llvm.org/LICENSE.txt for license information.
+// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
+//
+//===----------------------------------------------------------------------===//
+
+#include "lldb/Core/ThreadedCommunication.h"
+
+#include "lldb/Host/ThreadLauncher.h"
+#include "lldb/Utility/Connection.h"
+#include "lldb/Utility/ConstString.h"
+#include "lldb/Utility/Event.h"
+#include "lldb/Utility/LLDBLog.h"
+#include "lldb/Utility/Listener.h"
+#include "lldb/Utility/Log.h"
+#include "lldb/Utility/Status.h"
+
+#include "llvm/ADT/None.h"
+#include "llvm/Support/Compiler.h"
+
+#include <algorithm>
+#include <chrono>
+#include <cstring>
+#include <memory>
+
+#include <cerrno>
+#include <cinttypes>
+#include <cstdio>
+
+using namespace lldb;
+using namespace lldb_private;
+
+ConstString &ThreadedCommunication::GetStaticBroadcasterClass() {
+  static ConstString class_name("lldb.communication");
+  return class_name;
+}
+
+ThreadedCommunication::ThreadedCommunication(const char *name)
+    : Communication(), Broadcaster(nullptr, name), m_read_thread_enabled(false),
+      m_read_thread_did_exit(false), m_bytes(), m_bytes_mutex(),
+      m_synchronize_mutex(), m_callback(nullptr), m_callback_baton(nullptr) {
+  LLDB_LOG(GetLog(LLDBLog::Object | LLDBLog::Communication),
+           "{0} ThreadedCommunication::ThreadedCommunication (name = {1})",
+           this, name);
+
+  SetEventName(eBroadcastBitDisconnected, "disconnected");
+  SetEventName(eBroadcastBitReadThreadGotBytes, "got bytes");
+  SetEventName(eBroadcastBitReadThreadDidExit, "read thread did exit");
+  SetEventName(eBroadcastBitReadThreadShouldExit, "read thread should exit");
+  SetEventName(eBroadcastBitPacketAvailable, "packet available");
+  SetEventName(eBroadcastBitNoMorePendingInput, "no more pending input");
+
+  CheckInWithManager();
+}
+
+ThreadedCommunication::~ThreadedCommunication() {
+  LLDB_LOG(GetLog(LLDBLog::Object | LLDBLog::Communication),
+           "{0} ThreadedCommunication::~ThreadedCommunication (name = {1})",
+           this, GetBroadcasterName().AsCString());
+}
+
+void ThreadedCommunication::Clear() {
+  SetReadThreadBytesReceivedCallback(nullptr, nullptr);
+  StopReadThread(nullptr);
+  Communication::Clear();
+}
+
+ConnectionStatus ThreadedCommunication::Disconnect(Status *error_ptr) {
+  assert((!m_read_thread_enabled || m_read_thread_did_exit) &&
+         "Disconnecting while the read thread is running is racy!");
+  return Communication::Disconnect(error_ptr);
+}
+
+size_t ThreadedCommunication::Read(void *dst, size_t dst_len,
+                                   const Timeout<std::micro> &timeout,
+                                   ConnectionStatus &status,
+                                   Status *error_ptr) {
+  Log *log = GetLog(LLDBLog::Communication);
+  LLDB_LOG(
+      log,
+      "this = {0}, dst = {1}, dst_len = {2}, timeout = {3}, connection = {4}",
+      this, dst, dst_len, timeout, m_connection_sp.get());
+
+  if (m_read_thread_enabled) {
+    // We have a dedicated read thread that is getting data for us
+    size_t cached_bytes = GetCachedBytes(dst, dst_len);
+    if (cached_bytes > 0) {
+      status = eConnectionStatusSuccess;
+      return cached_bytes;
+    }
+    if (timeout && timeout->count() == 0) {
+      if (error_ptr)
+        error_ptr->SetErrorString("Timed out.");
+      status = eConnectionStatusTimedOut;
+      return 0;
+    }
+
+    if (!m_connection_sp) {
+      if (error_ptr)
+        error_ptr->SetErrorString("Invalid connection.");
+      status = eConnectionStatusNoConnection;
+      return 0;
+    }
+
+    ListenerSP listener_sp(
+        Listener::MakeListener("ThreadedCommunication::Read"));
+    listener_sp->StartListeningForEvents(
+        this, eBroadcastBitReadThreadGotBytes | eBroadcastBitReadThreadDidExit);
+    EventSP event_sp;
+    while (listener_sp->GetEvent(event_sp, timeout)) {
+      const uint32_t event_type = event_sp->GetType();
+      if (event_type & eBroadcastBitReadThreadGotBytes) {
+        return GetCachedBytes(dst, dst_len);
+      }
+
+      if (event_type & eBroadcastBitReadThreadDidExit) {
+        // If the thread exited of its own accord, it either means it
+        // hit an end-of-file condition or an error.
+        status = m_pass_status;
+        if (error_ptr)
+          *error_ptr = std::move(m_pass_error);
+
+        if (GetCloseOnEOF())
+          Disconnect(nullptr);
+        return 0;
+      }
+    }
+
+    if (error_ptr)
+      error_ptr->SetErrorString("Timed out.");
+    status = eConnectionStatusTimedOut;
+    return 0;
+  }
+
+  // We aren't using a read thread, just read the data synchronously in this
+  // thread.
+  return Communication::Read(dst, dst_len, timeout, status, error_ptr);
+}
+
+bool ThreadedCommunication::StartReadThread(Status *error_ptr) {
+  if (error_ptr)
+    error_ptr->Clear();
+
+  if (m_read_thread.IsJoinable())
+    return true;
+
+  LLDB_LOG(GetLog(LLDBLog::Communication),
+           "{0} ThreadedCommunication::StartReadThread ()", this);
+
+  const std::string thread_name =
+      llvm::formatv("<lldb.comm.{0}>", GetBroadcasterName());
+
+  m_read_thread_enabled = true;
+  m_read_thread_did_exit = false;
+  auto maybe_thread = ThreadLauncher::LaunchThread(
+      thread_name, [this] { return ReadThread(); });
+  if (maybe_thread) {
+    m_read_thread = *maybe_thread;
+  } else {
+    if (error_ptr)
+      *error_ptr = Status(maybe_thread.takeError());
+    else {
+      LLDB_LOG(GetLog(LLDBLog::Host), "failed to launch host thread: {}",
+               llvm::toString(maybe_thread.takeError()));
+    }
+  }
+
+  if (!m_read_thread.IsJoinable())
+    m_read_thread_enabled = false;
+
+  return m_read_thread_enabled;
+}
+
+bool ThreadedCommunication::StopReadThread(Status *error_ptr) {
+  if (!m_read_thread.IsJoinable())
+    return true;
+
+  LLDB_LOG(GetLog(LLDBLog::Communication),
+           "{0} ThreadedCommunication::StopReadThread ()", this);
+
+  m_read_thread_enabled = false;
+
+  BroadcastEvent(eBroadcastBitReadThreadShouldExit, nullptr);
+
+  // error = m_read_thread.Cancel();
+
+  Status error = m_read_thread.Join(nullptr);
+  return error.Success();
+}
+
+bool ThreadedCommunication::JoinReadThread(Status *error_ptr) {
+  if (!m_read_thread.IsJoinable())
+    return true;
+
+  Status error = m_read_thread.Join(nullptr);
+  return error.Success();
+}
+
+size_t ThreadedCommunication::GetCachedBytes(void *dst, size_t dst_len) {
+  std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex);
+  if (!m_bytes.empty()) {
+    // If DST is nullptr and we have a thread, then return the number of bytes
+    // that are available so the caller can call again
+    if (dst == nullptr)
+      return m_bytes.size();
+
+    const size_t len = std::min<size_t>(dst_len, m_bytes.size());
+
+    ::memcpy(dst, m_bytes.c_str(), len);
+    m_bytes.erase(m_bytes.begin(), m_bytes.begin() + len);
+
+    return len;
+  }
+  return 0;
+}
+
+void ThreadedCommunication::AppendBytesToCache(const uint8_t *bytes, size_t len,
+                                               bool broadcast,
+                                               ConnectionStatus status) {
+  LLDB_LOG(GetLog(LLDBLog::Communication),
+           "{0} ThreadedCommunication::AppendBytesToCache (src = {1}, src_len "
+           "= {2}, "
+           "broadcast = {3})",
+           this, bytes, (uint64_t)len, broadcast);
+  if ((bytes == nullptr || len == 0) &&
+      (status != lldb::eConnectionStatusEndOfFile))
+    return;
+  if (m_callback) {
+    // If the user registered a callback, then call it and do not broadcast
+    m_callback(m_callback_baton, bytes, len);
+  } else if (bytes != nullptr && len > 0) {
+    std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex);
+    m_bytes.append((const char *)bytes, len);
+    if (broadcast)
+      BroadcastEventIfUnique(eBroadcastBitReadThreadGotBytes);
+  }
+}
+
+bool ThreadedCommunication::ReadThreadIsRunning() {
+  return m_read_thread_enabled;
+}
+
+lldb::thread_result_t ThreadedCommunication::ReadThread() {
+  Log *log = GetLog(LLDBLog::Communication);
+
+  LLDB_LOG(log, "Communication({0}) thread starting...", this);
+
+  uint8_t buf[1024];
+
+  Status error;
+  ConnectionStatus status = eConnectionStatusSuccess;
+  bool done = false;
+  bool disconnect = false;
+  while (!done && m_read_thread_enabled) {
+    size_t bytes_read = ReadFromConnection(
+        buf, sizeof(buf), std::chrono::seconds(5), status, &error);
+    if (bytes_read > 0 || status == eConnectionStatusEndOfFile)
+      AppendBytesToCache(buf, bytes_read, true, status);
+
+    switch (status) {
+    case eConnectionStatusSuccess:
+      break;
+
+    case eConnectionStatusEndOfFile:
+      done = true;
+      disconnect = GetCloseOnEOF();
+      break;
+    case eConnectionStatusError: // Check GetError() for details
+      if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) {
+        // EIO on a pipe is usually caused by remote shutdown
+        disconnect = GetCloseOnEOF();
+        done = true;
+      }
+      if (error.Fail())
+        LLDB_LOG(log, "error: {0}, status = {1}", error,
+                 ThreadedCommunication::ConnectionStatusAsString(status));
+      break;
+    case eConnectionStatusInterrupted: // Synchronization signal from
+                                       // SynchronizeWithReadThread()
+      // The connection returns eConnectionStatusInterrupted only when there is
+      // no input pending to be read, so we can signal that.
+      BroadcastEvent(eBroadcastBitNoMorePendingInput);
+      break;
+    case eConnectionStatusNoConnection:   // No connection
+    case eConnectionStatusLostConnection: // Lost connection while connected to
+                                          // a valid connection
+      done = true;
+      [[fallthrough]];
+    case eConnectionStatusTimedOut: // Request timed out
+      if (error.Fail())
+        LLDB_LOG(log, "error: {0}, status = {1}", error,
+                 ThreadedCommunication::ConnectionStatusAsString(status));
+      break;
+    }
+  }
+  m_pass_status = status;
+  m_pass_error = std::move(error);
+  LLDB_LOG(log, "Communication({0}) thread exiting...", this);
+
+  // Handle threads wishing to synchronize with us.
+  {
+    // Prevent new ones from showing up.
+    m_read_thread_did_exit = true;
+
+    // Unblock any existing thread waiting for the synchronization signal.
+    BroadcastEvent(eBroadcastBitNoMorePendingInput);
+
+    // Wait for the thread to finish...
+    std::lock_guard<std::mutex> guard(m_synchronize_mutex);
+    // ... and disconnect.
+    if (disconnect)
+      Disconnect();
+  }
+
+  // Let clients know that this thread is exiting
+  BroadcastEvent(eBroadcastBitReadThreadDidExit);
+  return {};
+}
+
+void ThreadedCommunication::SetReadThreadBytesReceivedCallback(
+    ReadThreadBytesReceived callback, void *callback_baton) {
+  m_callback = callback;
+  m_callback_baton = callback_baton;
+}
+
+void ThreadedCommunication::SynchronizeWithReadThread() {
+  // Only one thread can do the synchronization dance at a time.
+  std::lock_guard<std::mutex> guard(m_synchronize_mutex);
+
+  // First start listening for the synchronization event.
+  ListenerSP listener_sp(Listener::MakeListener(
+      "ThreadedCommunication::SyncronizeWithReadThread"));
+  listener_sp->StartListeningForEvents(this, eBroadcastBitNoMorePendingInput);
+
+  // If the thread is not running, there is no point in synchronizing.
+  if (!m_read_thread_enabled || m_read_thread_did_exit)
+    return;
+
+  // Notify the read thread.
+  m_connection_sp->InterruptRead();
+
+  // Wait for the synchronization event.
+  EventSP event_sp;
+  listener_sp->GetEvent(event_sp, llvm::None);
+}
+
+void ThreadedCommunication::SetConnection(
+    std::unique_ptr<Connection> connection) {
+  StopReadThread(nullptr);
+  Communication::SetConnection(std::move(connection));
+}

diff  --git a/lldb/source/Plugins/Process/MacOSX-Kernel/CommunicationKDP.cpp b/lldb/source/Plugins/Process/MacOSX-Kernel/CommunicationKDP.cpp
index 343ab6bb17900..a63b437befc9a 100644
--- a/lldb/source/Plugins/Process/MacOSX-Kernel/CommunicationKDP.cpp
+++ b/lldb/source/Plugins/Process/MacOSX-Kernel/CommunicationKDP.cpp
@@ -29,7 +29,7 @@ using namespace lldb_private;
 
 // CommunicationKDP constructor
 CommunicationKDP::CommunicationKDP(const char *comm_name)
-    : Communication(comm_name), m_addr_byte_size(4),
+    : Communication(), m_addr_byte_size(4),
       m_byte_order(eByteOrderLittle), m_packet_timeout(5), m_sequence_mutex(),
       m_is_running(false), m_session_key(0u), m_request_sequence_id(0u),
       m_exception_sequence_id(0u), m_kdp_version_version(0u),

diff  --git a/lldb/source/Plugins/Process/MacOSX-Kernel/CommunicationKDP.h b/lldb/source/Plugins/Process/MacOSX-Kernel/CommunicationKDP.h
index c70f7de5bdcf5..f7ab9a176b475 100644
--- a/lldb/source/Plugins/Process/MacOSX-Kernel/CommunicationKDP.h
+++ b/lldb/source/Plugins/Process/MacOSX-Kernel/CommunicationKDP.h
@@ -21,8 +21,6 @@
 
 class CommunicationKDP : public lldb_private::Communication {
 public:
-  enum { eBroadcastBitRunPacketSent = kLoUserBroadcastBit };
-
   const static uint32_t kMaxPacketSize = 1200;
   const static uint32_t kMaxDataSize = 1024;
   typedef lldb_private::StreamBuffer<4096> PacketStreamType;

diff  --git a/lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunication.cpp b/lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunication.cpp
index 616de5148a5c1..33e041bb6e3e8 100644
--- a/lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunication.cpp
+++ b/lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunication.cpp
@@ -60,7 +60,7 @@ using namespace lldb_private::process_gdb_remote;
 // GDBRemoteCommunication constructor
 GDBRemoteCommunication::GDBRemoteCommunication(const char *comm_name,
                                                const char *listener_name)
-    : Communication(comm_name),
+    : Communication(), Broadcaster(nullptr, comm_name),
 #ifdef LLDB_CONFIGURATION_DEBUG
       m_packet_timeout(1000),
 #else

diff  --git a/lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunication.h b/lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunication.h
index 35e86c202b5b3..22737ab726c7e 100644
--- a/lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunication.h
+++ b/lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunication.h
@@ -80,10 +80,10 @@ enum GDBErrno {
 
 class ProcessGDBRemote;
 
-class GDBRemoteCommunication : public Communication {
+class GDBRemoteCommunication : public Communication, public Broadcaster {
 public:
   enum {
-    eBroadcastBitRunPacketSent = kLoUserBroadcastBit,
+    eBroadcastBitRunPacketSent = (1u << 0),
   };
 
   enum class PacketType { Invalid = 0, Standard, Notify };
@@ -180,6 +180,8 @@ class GDBRemoteCommunication : public Communication {
                       // false if this class represents a debug session for
                       // a single process
 
+  std::string m_bytes;
+  std::recursive_mutex m_bytes_mutex;
   CompressionType m_compression_type;
 
   PacketResult SendPacketNoLock(llvm::StringRef payload);

diff  --git a/lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunicationServerLLGS.cpp b/lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunicationServerLLGS.cpp
index bc9ccad29afb1..6ca5c41cc1d01 100644
--- a/lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunicationServerLLGS.cpp
+++ b/lldb/source/Plugins/Process/gdb-remote/GDBRemoteCommunicationServerLLGS.cpp
@@ -74,7 +74,7 @@ GDBRemoteCommunicationServerLLGS::GDBRemoteCommunicationServerLLGS(
                                          "gdb-remote.server.rx_packet"),
       m_mainloop(mainloop), m_process_factory(process_factory),
       m_current_process(nullptr), m_continue_process(nullptr),
-      m_stdio_communication("process.stdio") {
+      m_stdio_communication() {
   RegisterPacketHandlers();
 }
 

diff  --git a/lldb/source/Plugins/Process/gdb-remote/ProcessGDBRemote.cpp b/lldb/source/Plugins/Process/gdb-remote/ProcessGDBRemote.cpp
index 37272d3afabd3..c0765dcc65a2c 100644
--- a/lldb/source/Plugins/Process/gdb-remote/ProcessGDBRemote.cpp
+++ b/lldb/source/Plugins/Process/gdb-remote/ProcessGDBRemote.cpp
@@ -285,14 +285,6 @@ ProcessGDBRemote::ProcessGDBRemote(lldb::TargetSP target_sp,
               __FUNCTION__);
   }
 
-  const uint32_t gdb_event_mask = Communication::eBroadcastBitReadThreadDidExit;
-  if (m_async_listener_sp->StartListeningForEvents(
-          &m_gdb_comm, gdb_event_mask) != gdb_event_mask) {
-    LLDB_LOGF(log,
-              "ProcessGDBRemote::%s failed to listen for m_gdb_comm events",
-              __FUNCTION__);
-  }
-
   const uint64_t timeout_seconds =
       GetGlobalPluginProperties().GetPacketTimeout();
   if (timeout_seconds > 0)
@@ -3567,21 +3559,6 @@ thread_result_t ProcessGDBRemote::AsyncThread() {
           done = true;
           break;
 
-        default:
-          LLDB_LOGF(log,
-                    "ProcessGDBRemote::%s(pid = %" PRIu64
-                    ") got unknown event 0x%8.8x",
-                    __FUNCTION__, GetID(), event_type);
-          done = true;
-          break;
-        }
-      } else if (event_sp->BroadcasterIs(&m_gdb_comm)) {
-        switch (event_type) {
-        case Communication::eBroadcastBitReadThreadDidExit:
-          SetExitStatus(-1, "lost connection");
-          done = true;
-          break;
-
         default:
           LLDB_LOGF(log,
                     "ProcessGDBRemote::%s(pid = %" PRIu64

diff  --git a/lldb/source/Plugins/ScriptInterpreter/Python/ScriptInterpreterPython.cpp b/lldb/source/Plugins/ScriptInterpreter/Python/ScriptInterpreterPython.cpp
index d530936484b96..d0f67a5684c5e 100644
--- a/lldb/source/Plugins/ScriptInterpreter/Python/ScriptInterpreterPython.cpp
+++ b/lldb/source/Plugins/ScriptInterpreter/Python/ScriptInterpreterPython.cpp
@@ -25,9 +25,9 @@
 #include "lldb/API/SBValue.h"
 #include "lldb/Breakpoint/StoppointCallbackContext.h"
 #include "lldb/Breakpoint/WatchpointOptions.h"
-#include "lldb/Core/Communication.h"
 #include "lldb/Core/Debugger.h"
 #include "lldb/Core/PluginManager.h"
+#include "lldb/Core/ThreadedCommunication.h"
 #include "lldb/Core/ValueObject.h"
 #include "lldb/DataFormatters/TypeSummary.h"
 #include "lldb/Host/FileSystem.h"

diff  --git a/lldb/unittests/Core/CommunicationTest.cpp b/lldb/unittests/Core/CommunicationTest.cpp
index a6da6302e5e14..19ca4cb3d2493 100644
--- a/lldb/unittests/Core/CommunicationTest.cpp
+++ b/lldb/unittests/Core/CommunicationTest.cpp
@@ -7,6 +7,7 @@
 //===----------------------------------------------------------------------===//
 
 #include "lldb/Core/Communication.h"
+#include "lldb/Core/ThreadedCommunication.h"
 #include "lldb/Host/Config.h"
 #include "lldb/Host/ConnectionFileDescriptor.h"
 #include "lldb/Host/Pipe.h"
@@ -37,7 +38,7 @@ static void CommunicationReadTest(bool use_read_thread) {
   ASSERT_THAT_ERROR(a->Write("test", num_bytes).ToError(), llvm::Succeeded());
   ASSERT_EQ(num_bytes, 4U);
 
-  Communication comm("test");
+  ThreadedCommunication comm("test");
   comm.SetConnection(std::make_unique<ConnectionFileDescriptor>(b.release()));
   comm.SetCloseOnEOF(true);
 
@@ -118,7 +119,7 @@ TEST_F(CommunicationTest, SynchronizeWhileClosing) {
   std::unique_ptr<TCPSocket> a, b;
   ASSERT_TRUE(CreateTCPConnectedSockets("localhost", &a, &b));
 
-  Communication comm("test");
+  ThreadedCommunication comm("test");
   comm.SetConnection(std::make_unique<ConnectionFileDescriptor>(b.release()));
   comm.SetCloseOnEOF(true);
   ASSERT_TRUE(comm.StartReadThread());
@@ -146,7 +147,7 @@ TEST_F(CommunicationTest, WriteAll) {
 
   ConnectionFileDescriptor read_conn{pipe.ReleaseReadFileDescriptor(),
                                      /*owns_fd=*/true};
-  Communication write_comm("test");
+  Communication write_comm;
   write_comm.SetConnection(
       std::make_unique<ConnectionFileDescriptor>(write_fd, /*owns_fd=*/true));
 


        


More information about the lldb-commits mailing list