[llvm-branch-commits] [lldb] b6e04ac - [lldb/test] Avoid the socket "pump" thread

Pavel Labath via llvm-branch-commits llvm-branch-commits at lists.llvm.org
Mon Nov 30 00:14:22 PST 2020


Author: Pavel Labath
Date: 2020-11-30T09:07:45+01:00
New Revision: b6e04ac5aa881c1fbb66da884b04e48dfb102474

URL: https://github.com/llvm/llvm-project/commit/b6e04ac5aa881c1fbb66da884b04e48dfb102474
DIFF: https://github.com/llvm/llvm-project/commit/b6e04ac5aa881c1fbb66da884b04e48dfb102474.diff

LOG: [lldb/test] Avoid the socket "pump" thread

A separate thread is not necessary, as we can do its work on the main
thread, while waiting for the packet to arrive. This makes the code
easier to understand and debug (other simplifications are possible too,
but I'll leave that for separate patches). The new implementation also
avoids busy waiting.

Added: 
    

Modified: 
    lldb/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py
    lldb/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py

Removed: 
    


################################################################################
diff  --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py
index 0c01bdfb1c69..b5c635a77b5c 100644
--- a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py
+++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py
@@ -236,7 +236,7 @@ def expect_lldb_gdbserver_replay(
                 if sequence_entry.is_output_matcher():
                     try:
                         # Grab next entry from the output queue.
-                        content = pump_queues.output_queue().get(True, timeout_seconds)
+                        content = pump.get_output(timeout_seconds)
                     except queue.Empty:
                         if logger:
                             logger.warning(
@@ -247,7 +247,7 @@ def expect_lldb_gdbserver_replay(
                                 pump.get_accumulated_output()))
                 else:
                     try:
-                        content = pump_queues.packet_queue().get(True, timeout_seconds)
+                        content = pump.get_packet(timeout_seconds)
                     except queue.Empty:
                         if logger:
                             logger.warning(

diff  --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py
index 3de76345896d..6c41ed473b45 100644
--- a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py
+++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py
@@ -5,6 +5,7 @@
 import re
 import select
 import threading
+import time
 import traceback
 
 from six.moves import queue
@@ -74,8 +75,6 @@ def __init__(self, pump_socket, pump_queues, logger=None):
         if not pump_socket:
             raise Exception("pump_socket cannot be None")
 
-        self._thread = None
-        self._stop_thread = False
         self._socket = pump_socket
         self._logger = logger
         self._receive_buffer = ""
@@ -83,29 +82,42 @@ def __init__(self, pump_socket, pump_queues, logger=None):
         self._pump_queues = pump_queues
 
     def __enter__(self):
-        """Support the python 'with' statement.
-
-        Start the pump thread."""
-        self.start_pump_thread()
+        self._receive_buffer = ""
+        self._accumulated_output = ""
         return self
 
     def __exit__(self, exit_type, value, the_traceback):
-        """Support the python 'with' statement.
-
-        Shut down the pump thread."""
-        self.stop_pump_thread()
+        pass
+
+    def _read(self, timeout_seconds, q):
+        now = time.monotonic()
+        deadline = now + timeout_seconds
+        while q.empty() and now <= deadline:
+            can_read, _, _ = select.select([self._socket], [], [], deadline-now)
+            now = time.monotonic()
+            if can_read and self._socket in can_read:
+                try:
+                    new_bytes = seven.bitcast_to_string(self._socket.recv(4096))
+                    if self._logger and new_bytes and len(new_bytes) > 0:
+                        self._logger.debug(
+                            "pump received bytes: {}".format(new_bytes))
+                except:
+                    # Likely a closed socket.  Done with the pump thread.
+                    if self._logger:
+                        self._logger.debug(
+                            "socket read failed, stopping pump read thread\n" +
+                            traceback.format_exc(3))
+                        break
+                self._process_new_bytes(new_bytes)
+        if q.empty():
+            raise queue.Empty()
+        return q.get(True)
 
-    def start_pump_thread(self):
-        if self._thread:
-            raise Exception("pump thread is already running")
-        self._stop_thread = False
-        self._thread = threading.Thread(target=self._run_method)
-        self._thread.start()
+    def get_output(self, timeout_seconds):
+        return self._read(timeout_seconds, self._pump_queues.output_queue())
 
-    def stop_pump_thread(self):
-        self._stop_thread = True
-        if self._thread:
-            self._thread.join()
+    def get_packet(self, timeout_seconds):
+        return self._read(timeout_seconds, self._pump_queues.packet_queue())
 
     def _process_new_bytes(self, new_bytes):
         if not new_bytes:
@@ -162,34 +174,6 @@ def _process_new_bytes(self, new_bytes):
                     # packet. Stop trying until we read more.
                     has_more = False
 
-    def _run_method(self):
-        self._receive_buffer = ""
-        self._accumulated_output = ""
-
-        if self._logger:
-            self._logger.info("socket pump starting")
-
-        # Keep looping around until we're asked to stop the thread.
-        while not self._stop_thread:
-            can_read, _, _ = select.select([self._socket], [], [], 0)
-            if can_read and self._socket in can_read:
-                try:
-                    new_bytes = seven.bitcast_to_string(self._socket.recv(4096))
-                    if self._logger and new_bytes and len(new_bytes) > 0:
-                        self._logger.debug(
-                            "pump received bytes: {}".format(new_bytes))
-                except:
-                    # Likely a closed socket.  Done with the pump thread.
-                    if self._logger:
-                        self._logger.debug(
-                            "socket read failed, stopping pump read thread\n" +
-                            traceback.format_exc(3))
-                    break
-                self._process_new_bytes(new_bytes)
-
-        if self._logger:
-            self._logger.info("socket pump exiting")
-
     def get_accumulated_output(self):
         return self._accumulated_output
 


        


More information about the llvm-branch-commits mailing list