[Lldb-commits] [lldb] a2f4f7d - [lldb/test] Refactor socket_packet_pump

Pavel Labath via lldb-commits lldb-commits at lists.llvm.org
Mon Dec 7 00:28:55 PST 2020


Author: Pavel Labath
Date: 2020-12-07T09:24:13+01:00
New Revision: a2f4f7daf76c767efd668390bc7f8b99bdb1218c

URL: https://github.com/llvm/llvm-project/commit/a2f4f7daf76c767efd668390bc7f8b99bdb1218c
DIFF: https://github.com/llvm/llvm-project/commit/a2f4f7daf76c767efd668390bc7f8b99bdb1218c.diff

LOG: [lldb/test] Refactor socket_packet_pump

Now that the class does not use a thread, the name is no longer
appropriate. Rename the class to "Server" and make it a long-lived
object (instead of recreating it for every expect_gdbremote_sequence
call). The idea is to make this class a wrapper for all communication
with debug/lldb-server. This will enable some additional cleanups as we
had some duplication between socket_pump non-pump code paths.

Also squeeze in some small improvements:
- use python-level timeouts on sockets instead of the manual select
  calls
- use byte arrays instead of strings when working with raw packets

Added: 
    

Modified: 
    lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py
    lldb/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py
    lldb/test/API/tools/lldb-server/TestGdbRemoteCompletion.py

Removed: 
    lldb/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py


################################################################################
diff  --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py
index 2f278289988c..d7bfb7fbda32 100644
--- a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py
+++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py
@@ -87,7 +87,6 @@ def setUp(self):
 
         self.setUpBaseLogging()
         self.debug_monitor_extra_args = []
-        self._pump_queues = socket_packet_pump.PumpQueues()
 
         if self.isVerboseLoggingRequested():
             # If requested, full logs go to a log file
@@ -118,8 +117,6 @@ def setUp(self):
             self.stub_hostname = "localhost"
 
     def tearDown(self):
-        self._pump_queues.verify_queues_empty()
-
         self.logger.removeHandler(self._verbose_log_handler)
         self._verbose_log_handler = None
         TestBase.tearDown(self)
@@ -342,6 +339,7 @@ def launch_debug_monitor(self, attach_pid=None, logfile=None):
 
         if self.reverse_connect:
             self.sock = sock.accept()[0]
+            self.sock.settimeout(self.DEFAULT_TIMEOUT)
 
         return server
 
@@ -354,6 +352,7 @@ def connect_to_debug_monitor(self, attach_pid=None):
             # Schedule debug monitor to be shut down during teardown.
             logger = self.logger
 
+            self._server = Server(self.sock, server)
             return server
 
         # We're using a random port algorithm to try not to collide with other ports,
@@ -375,6 +374,7 @@ def connect_to_debug_monitor(self, attach_pid=None):
                 try:
                     logger.info("Connect attempt %d", connect_attemps + 1)
                     self.sock = self.create_socket()
+                    self._server = Server(self.sock, server)
                     return server
                 except _ConnectionRefused as serr:
                     # Ignore, and try again.
@@ -632,9 +632,8 @@ def parse_register_info_packets(self, context):
     def expect_gdbremote_sequence(self):
         return expect_lldb_gdbserver_replay(
             self,
-            self.sock,
+            self._server,
             self.test_sequence,
-            self._pump_queues,
             self.DEFAULT_TIMEOUT * len(self.test_sequence),
             self.logger)
 

diff  --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py
index b5c635a77b5c..07136108b2a4 100644
--- a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py
+++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py
@@ -3,19 +3,18 @@
 
 from __future__ import division, print_function
 
-
+import binascii
 import os
 import os.path
 import platform
 import re
 import six
-import socket_packet_pump
+import socket
 import subprocess
+from lldbsuite.support import seven
 from lldbsuite.test.lldbtest import *
 from lldbsuite.test import configuration
-
-from six.moves import queue
-
+from textwrap import dedent
 
 def _get_debug_monitor_from_lldb(lldb_exe, debug_monitor_basename):
     """Return the debug monitor exe path given the lldb exe path.
@@ -165,9 +164,8 @@ def assert_packets_equal(asserter, actual_packet, expected_packet):
 
 def expect_lldb_gdbserver_replay(
         asserter,
-        sock,
+        server,
         test_sequence,
-        pump_queues,
         timeout_seconds,
         logger=None):
     """Replay socket communication with lldb-gdbserver and verify responses.
@@ -175,8 +173,6 @@ def expect_lldb_gdbserver_replay(
     Args:
         asserter: the object providing assertEqual(first, second, msg=None), e.g. TestCase instance.
 
-        sock: the TCP socket connected to the lldb-gdbserver exe.
-
         test_sequence: a GdbRemoteTestSequence instance that describes
             the messages sent to the gdb remote and the responses
             expected from it.
@@ -207,75 +203,62 @@ def expect_lldb_gdbserver_replay(
         return {}
 
     context = {"O_count": 0, "O_content": ""}
-    with socket_packet_pump.SocketPacketPump(sock, pump_queues, logger) as pump:
-        # Grab the first sequence entry.
-        sequence_entry = test_sequence.entries.pop(0)
-
-        # While we have an active sequence entry, send messages
-        # destined for the stub and collect/match/process responses
-        # expected from the stub.
-        while sequence_entry:
-            if sequence_entry.is_send_to_remote():
-                # This is an entry to send to the remote debug monitor.
-                send_packet = sequence_entry.get_send_packet()
-                if logger:
-                    if len(send_packet) == 1 and send_packet[0] == chr(3):
-                        packet_desc = "^C"
-                    else:
-                        packet_desc = send_packet
-                    logger.info(
-                        "sending packet to remote: {}".format(packet_desc))
-                sock.sendall(send_packet.encode())
-            else:
-                # This is an entry expecting to receive content from the remote
-                # debug monitor.
 
-                # We'll pull from (and wait on) the queue appropriate for the type of matcher.
-                # We keep separate queues for process output (coming from non-deterministic
-                # $O packet division) and for all other packets.
-                if sequence_entry.is_output_matcher():
-                    try:
-                        # Grab next entry from the output queue.
-                        content = pump.get_output(timeout_seconds)
-                    except queue.Empty:
-                        if logger:
-                            logger.warning(
-                                "timeout waiting for stub output (accumulated output:{})".format(
-                                    pump.get_accumulated_output()))
-                        raise Exception(
-                            "timed out while waiting for output match (accumulated output: {})".format(
-                                pump.get_accumulated_output()))
+    # Grab the first sequence entry.
+    sequence_entry = test_sequence.entries.pop(0)
+
+    # While we have an active sequence entry, send messages
+    # destined for the stub and collect/match/process responses
+    # expected from the stub.
+    while sequence_entry:
+        if sequence_entry.is_send_to_remote():
+            # This is an entry to send to the remote debug monitor.
+            send_packet = sequence_entry.get_send_packet()
+            if logger:
+                if len(send_packet) == 1 and send_packet[0] == chr(3):
+                    packet_desc = "^C"
                 else:
-                    try:
-                        content = pump.get_packet(timeout_seconds)
-                    except queue.Empty:
-                        if logger:
-                            logger.warning(
-                                "timeout waiting for packet match (receive buffer: {})".format(
-                                    pump.get_receive_buffer()))
-                        raise Exception(
-                            "timed out while waiting for packet match (receive buffer: {})".format(
-                                pump.get_receive_buffer()))
-
-                # Give the sequence entry the opportunity to match the content.
-                # Output matchers might match or pass after more output accumulates.
-                # Other packet types generally must match.
-                asserter.assertIsNotNone(content)
-                context = sequence_entry.assert_match(
-                    asserter, content, context=context)
-
-            # Move on to next sequence entry as needed.  Some sequence entries support executing multiple
-            # times in 
diff erent states (for looping over query/response
-            # packets).
-            if sequence_entry.is_consumed():
-                if len(test_sequence.entries) > 0:
-                    sequence_entry = test_sequence.entries.pop(0)
+                    packet_desc = send_packet
+                logger.info(
+                    "sending packet to remote: {}".format(packet_desc))
+            server.send_raw(send_packet.encode())
+        else:
+            # This is an entry expecting to receive content from the remote
+            # debug monitor.
+
+            # We'll pull from (and wait on) the queue appropriate for the type of matcher.
+            # We keep separate queues for process output (coming from non-deterministic
+            # $O packet division) and for all other packets.
+            try:
+                if sequence_entry.is_output_matcher():
+                    # Grab next entry from the output queue.
+                    content = server.get_raw_output_packet()
                 else:
-                    sequence_entry = None
+                    content = server.get_raw_normal_packet()
+                content = seven.bitcast_to_string(content)
+            except socket.timeout:
+                asserter.fail(
+                        "timed out while waiting for '{}':\n{}".format(sequence_entry, server))
+
+            # Give the sequence entry the opportunity to match the content.
+            # Output matchers might match or pass after more output accumulates.
+            # Other packet types generally must match.
+            asserter.assertIsNotNone(content)
+            context = sequence_entry.assert_match(
+                asserter, content, context=context)
+
+        # Move on to next sequence entry as needed.  Some sequence entries support executing multiple
+        # times in 
diff erent states (for looping over query/response
+        # packets).
+        if sequence_entry.is_consumed():
+            if len(test_sequence.entries) > 0:
+                sequence_entry = test_sequence.entries.pop(0)
+            else:
+                sequence_entry = None
 
-        # Fill in the O_content entries.
-        context["O_count"] = 1
-        context["O_content"] = pump.get_accumulated_output()
+    # Fill in the O_content entries.
+    context["O_count"] = 1
+    context["O_content"] = server.consume_accumulated_output()
 
     return context
 
@@ -950,9 +933,99 @@ def process_is_running(pid, unknown_value=True):
     # Check if the pid is in the process_ids
     return pid in process_ids
 
-if __name__ == '__main__':
-    EXE_PATH = get_lldb_server_exe()
-    if EXE_PATH:
-        print("lldb-server path detected: {}".format(EXE_PATH))
+def _handle_output_packet_string(packet_contents):
+    if (not packet_contents) or (len(packet_contents) < 1):
+        return None
+    elif packet_contents[0:1] != b"O":
+        return None
+    elif packet_contents == b"OK":
+        return None
     else:
-        print("lldb-server could not be found")
+        return binascii.unhexlify(packet_contents[1:])
+
+class Server(object):
+
+    _GDB_REMOTE_PACKET_REGEX = re.compile(br'^\$([^\#]*)#[0-9a-fA-F]{2}')
+
+    class ChecksumMismatch(Exception):
+        pass
+
+    def __init__(self, sock, proc = None):
+        self._accumulated_output = b""
+        self._receive_buffer = b""
+        self._normal_queue = []
+        self._output_queue = []
+        self._sock = sock
+        self._proc = proc
+
+    def send_raw(self, frame):
+        self._sock.sendall(frame)
+
+    def _read(self, q):
+        while not q:
+            new_bytes = self._sock.recv(4096)
+            self._process_new_bytes(new_bytes)
+        return q.pop(0)
+
+    def _process_new_bytes(self, new_bytes):
+        # Add new bytes to our accumulated unprocessed packet bytes.
+        self._receive_buffer += new_bytes
+
+        # Parse fully-formed packets into individual packets.
+        has_more = len(self._receive_buffer) > 0
+        while has_more:
+            if len(self._receive_buffer) <= 0:
+                has_more = False
+            # handle '+' ack
+            elif self._receive_buffer[0:1] == b"+":
+                self._normal_queue += [b"+"]
+                self._receive_buffer = self._receive_buffer[1:]
+            else:
+                packet_match = self._GDB_REMOTE_PACKET_REGEX.match(
+                    self._receive_buffer)
+                if packet_match:
+                    # Our receive buffer matches a packet at the
+                    # start of the receive buffer.
+                    new_output_content = _handle_output_packet_string(
+                        packet_match.group(1))
+                    if new_output_content:
+                        # This was an $O packet with new content.
+                        self._accumulated_output += new_output_content
+                        self._output_queue += [self._accumulated_output]
+                    else:
+                        # Any packet other than $O.
+                        self._normal_queue += [packet_match.group(0)]
+
+                    # Remove the parsed packet from the receive
+                    # buffer.
+                    self._receive_buffer = self._receive_buffer[
+                        len(packet_match.group(0)):]
+                else:
+                    # We don't have enough in the receive bufferto make a full
+                    # packet. Stop trying until we read more.
+                    has_more = False
+
+    def get_raw_output_packet(self):
+        return self._read(self._output_queue)
+
+    def get_raw_normal_packet(self):
+        return self._read(self._normal_queue)
+
+    def get_accumulated_output(self):
+        return self._accumulated_output
+
+    def consume_accumulated_output(self):
+        output = self._accumulated_output
+        self._accumulated_output = b""
+        return output
+
+    def __str__(self):
+        return dedent("""\
+            server '{}' on '{}'
+            _receive_buffer: {}
+            _normal_queue: {}
+            _output_queue: {}
+            _accumulated_output: {}
+            """).format(self._proc, self._sock, self._receive_buffer,
+                    self._normal_queue, self._output_queue,
+                    self._accumulated_output)

diff  --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py
deleted file mode 100644
index 6c41ed473b45..000000000000
--- a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py
+++ /dev/null
@@ -1,181 +0,0 @@
-
-from __future__ import print_function
-
-
-import re
-import select
-import threading
-import time
-import traceback
-
-from six.moves import queue
-from lldbsuite.support import seven
-
-
-def _handle_output_packet_string(packet_contents):
-    if (not packet_contents) or (len(packet_contents) < 1):
-        return None
-    elif packet_contents[0] != "O":
-        return None
-    elif packet_contents == "OK":
-        return None
-    else:
-        return seven.unhexlify(packet_contents[1:])
-
-
-def _dump_queue(the_queue):
-    while not the_queue.empty():
-        print(repr(the_queue.get(True)))
-        print("\n")
-
-
-class PumpQueues(object):
-
-    def __init__(self):
-        self._output_queue = queue.Queue()
-        self._packet_queue = queue.Queue()
-
-    def output_queue(self):
-        return self._output_queue
-
-    def packet_queue(self):
-        return self._packet_queue
-
-    def verify_queues_empty(self):
-        # Warn if there is any content left in any of the queues.
-        # That would represent unmatched packets.
-        if not self.output_queue().empty():
-            print("warning: output queue entries still exist:")
-            _dump_queue(self.output_queue())
-            print("from here:")
-            traceback.print_stack()
-
-        if not self.packet_queue().empty():
-            print("warning: packet queue entries still exist:")
-            _dump_queue(self.packet_queue())
-            print("from here:")
-            traceback.print_stack()
-
-
-class SocketPacketPump(object):
-    """A threaded packet reader that partitions packets into two streams.
-
-    All incoming $O packet content is accumulated with the current accumulation
-    state put into the OutputQueue.
-
-    All other incoming packets are placed in the packet queue.
-
-    A select thread can be started and stopped, and runs to place packet
-    content into the two queues.
-    """
-
-    _GDB_REMOTE_PACKET_REGEX = re.compile(r'^\$([^\#]*)#[0-9a-fA-F]{2}')
-
-    def __init__(self, pump_socket, pump_queues, logger=None):
-        if not pump_socket:
-            raise Exception("pump_socket cannot be None")
-
-        self._socket = pump_socket
-        self._logger = logger
-        self._receive_buffer = ""
-        self._accumulated_output = ""
-        self._pump_queues = pump_queues
-
-    def __enter__(self):
-        self._receive_buffer = ""
-        self._accumulated_output = ""
-        return self
-
-    def __exit__(self, exit_type, value, the_traceback):
-        pass
-
-    def _read(self, timeout_seconds, q):
-        now = time.monotonic()
-        deadline = now + timeout_seconds
-        while q.empty() and now <= deadline:
-            can_read, _, _ = select.select([self._socket], [], [], deadline-now)
-            now = time.monotonic()
-            if can_read and self._socket in can_read:
-                try:
-                    new_bytes = seven.bitcast_to_string(self._socket.recv(4096))
-                    if self._logger and new_bytes and len(new_bytes) > 0:
-                        self._logger.debug(
-                            "pump received bytes: {}".format(new_bytes))
-                except:
-                    # Likely a closed socket.  Done with the pump thread.
-                    if self._logger:
-                        self._logger.debug(
-                            "socket read failed, stopping pump read thread\n" +
-                            traceback.format_exc(3))
-                        break
-                self._process_new_bytes(new_bytes)
-        if q.empty():
-            raise queue.Empty()
-        return q.get(True)
-
-    def get_output(self, timeout_seconds):
-        return self._read(timeout_seconds, self._pump_queues.output_queue())
-
-    def get_packet(self, timeout_seconds):
-        return self._read(timeout_seconds, self._pump_queues.packet_queue())
-
-    def _process_new_bytes(self, new_bytes):
-        if not new_bytes:
-            return
-        if len(new_bytes) < 1:
-            return
-
-        # Add new bytes to our accumulated unprocessed packet bytes.
-        self._receive_buffer += new_bytes
-
-        # Parse fully-formed packets into individual packets.
-        has_more = len(self._receive_buffer) > 0
-        while has_more:
-            if len(self._receive_buffer) <= 0:
-                has_more = False
-            # handle '+' ack
-            elif self._receive_buffer[0] == "+":
-                self._pump_queues.packet_queue().put("+")
-                self._receive_buffer = self._receive_buffer[1:]
-                if self._logger:
-                    self._logger.debug(
-                        "parsed packet from stub: +\n" +
-                        "new receive_buffer: {}".format(
-                            self._receive_buffer))
-            else:
-                packet_match = self._GDB_REMOTE_PACKET_REGEX.match(
-                    self._receive_buffer)
-                if packet_match:
-                    # Our receive buffer matches a packet at the
-                    # start of the receive buffer.
-                    new_output_content = _handle_output_packet_string(
-                        packet_match.group(1))
-                    if new_output_content:
-                        # This was an $O packet with new content.
-                        self._accumulated_output += new_output_content
-                        self._pump_queues.output_queue().put(self._accumulated_output)
-                    else:
-                        # Any packet other than $O.
-                        self._pump_queues.packet_queue().put(packet_match.group(0))
-
-                    # Remove the parsed packet from the receive
-                    # buffer.
-                    self._receive_buffer = self._receive_buffer[
-                        len(packet_match.group(0)):]
-                    if self._logger:
-                        self._logger.debug(
-                            "parsed packet from stub: " +
-                            packet_match.group(0))
-                        self._logger.debug(
-                            "new receive_buffer: " +
-                            self._receive_buffer)
-                else:
-                    # We don't have enough in the receive bufferto make a full
-                    # packet. Stop trying until we read more.
-                    has_more = False
-
-    def get_accumulated_output(self):
-        return self._accumulated_output
-
-    def get_receive_buffer(self):
-        return self._receive_buffer

diff  --git a/lldb/test/API/tools/lldb-server/TestGdbRemoteCompletion.py b/lldb/test/API/tools/lldb-server/TestGdbRemoteCompletion.py
index 94e628c811af..5eec3c44f67a 100644
--- a/lldb/test/API/tools/lldb-server/TestGdbRemoteCompletion.py
+++ b/lldb/test/API/tools/lldb-server/TestGdbRemoteCompletion.py
@@ -27,6 +27,7 @@ def init_lldb_server(self):
         self.stub_hostname = "localhost"
         self.port = int(lldbutil.wait_for_file_on_target(self, port_file))
         self.sock = self.create_socket()
+        self._server = Server(self.sock, server)
 
         self.add_no_ack_remote_stream()
 


        


More information about the lldb-commits mailing list