[Lldb-commits] [lldb] a2f4f7d - [lldb/test] Refactor socket_packet_pump
Pavel Labath via lldb-commits
lldb-commits at lists.llvm.org
Mon Dec 7 00:28:55 PST 2020
Author: Pavel Labath
Date: 2020-12-07T09:24:13+01:00
New Revision: a2f4f7daf76c767efd668390bc7f8b99bdb1218c
URL: https://github.com/llvm/llvm-project/commit/a2f4f7daf76c767efd668390bc7f8b99bdb1218c
DIFF: https://github.com/llvm/llvm-project/commit/a2f4f7daf76c767efd668390bc7f8b99bdb1218c.diff
LOG: [lldb/test] Refactor socket_packet_pump
Now that the class does not use a thread, the name is no longer
appropriate. Rename the class to "Server" and make it a long-lived
object (instead of recreating it for every expect_gdbremote_sequence
call). The idea is to make this class a wrapper for all communication
with debug/lldb-server. This will enable some additional cleanups as we
had some duplication between socket_pump non-pump code paths.
Also squeeze in some small improvements:
- use python-level timeouts on sockets instead of the manual select
calls
- use byte arrays instead of strings when working with raw packets
Added:
Modified:
lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py
lldb/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py
lldb/test/API/tools/lldb-server/TestGdbRemoteCompletion.py
Removed:
lldb/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py
################################################################################
diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py
index 2f278289988c..d7bfb7fbda32 100644
--- a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py
+++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py
@@ -87,7 +87,6 @@ def setUp(self):
self.setUpBaseLogging()
self.debug_monitor_extra_args = []
- self._pump_queues = socket_packet_pump.PumpQueues()
if self.isVerboseLoggingRequested():
# If requested, full logs go to a log file
@@ -118,8 +117,6 @@ def setUp(self):
self.stub_hostname = "localhost"
def tearDown(self):
- self._pump_queues.verify_queues_empty()
-
self.logger.removeHandler(self._verbose_log_handler)
self._verbose_log_handler = None
TestBase.tearDown(self)
@@ -342,6 +339,7 @@ def launch_debug_monitor(self, attach_pid=None, logfile=None):
if self.reverse_connect:
self.sock = sock.accept()[0]
+ self.sock.settimeout(self.DEFAULT_TIMEOUT)
return server
@@ -354,6 +352,7 @@ def connect_to_debug_monitor(self, attach_pid=None):
# Schedule debug monitor to be shut down during teardown.
logger = self.logger
+ self._server = Server(self.sock, server)
return server
# We're using a random port algorithm to try not to collide with other ports,
@@ -375,6 +374,7 @@ def connect_to_debug_monitor(self, attach_pid=None):
try:
logger.info("Connect attempt %d", connect_attemps + 1)
self.sock = self.create_socket()
+ self._server = Server(self.sock, server)
return server
except _ConnectionRefused as serr:
# Ignore, and try again.
@@ -632,9 +632,8 @@ def parse_register_info_packets(self, context):
def expect_gdbremote_sequence(self):
return expect_lldb_gdbserver_replay(
self,
- self.sock,
+ self._server,
self.test_sequence,
- self._pump_queues,
self.DEFAULT_TIMEOUT * len(self.test_sequence),
self.logger)
diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py
index b5c635a77b5c..07136108b2a4 100644
--- a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py
+++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/lldbgdbserverutils.py
@@ -3,19 +3,18 @@
from __future__ import division, print_function
-
+import binascii
import os
import os.path
import platform
import re
import six
-import socket_packet_pump
+import socket
import subprocess
+from lldbsuite.support import seven
from lldbsuite.test.lldbtest import *
from lldbsuite.test import configuration
-
-from six.moves import queue
-
+from textwrap import dedent
def _get_debug_monitor_from_lldb(lldb_exe, debug_monitor_basename):
"""Return the debug monitor exe path given the lldb exe path.
@@ -165,9 +164,8 @@ def assert_packets_equal(asserter, actual_packet, expected_packet):
def expect_lldb_gdbserver_replay(
asserter,
- sock,
+ server,
test_sequence,
- pump_queues,
timeout_seconds,
logger=None):
"""Replay socket communication with lldb-gdbserver and verify responses.
@@ -175,8 +173,6 @@ def expect_lldb_gdbserver_replay(
Args:
asserter: the object providing assertEqual(first, second, msg=None), e.g. TestCase instance.
- sock: the TCP socket connected to the lldb-gdbserver exe.
-
test_sequence: a GdbRemoteTestSequence instance that describes
the messages sent to the gdb remote and the responses
expected from it.
@@ -207,75 +203,62 @@ def expect_lldb_gdbserver_replay(
return {}
context = {"O_count": 0, "O_content": ""}
- with socket_packet_pump.SocketPacketPump(sock, pump_queues, logger) as pump:
- # Grab the first sequence entry.
- sequence_entry = test_sequence.entries.pop(0)
-
- # While we have an active sequence entry, send messages
- # destined for the stub and collect/match/process responses
- # expected from the stub.
- while sequence_entry:
- if sequence_entry.is_send_to_remote():
- # This is an entry to send to the remote debug monitor.
- send_packet = sequence_entry.get_send_packet()
- if logger:
- if len(send_packet) == 1 and send_packet[0] == chr(3):
- packet_desc = "^C"
- else:
- packet_desc = send_packet
- logger.info(
- "sending packet to remote: {}".format(packet_desc))
- sock.sendall(send_packet.encode())
- else:
- # This is an entry expecting to receive content from the remote
- # debug monitor.
- # We'll pull from (and wait on) the queue appropriate for the type of matcher.
- # We keep separate queues for process output (coming from non-deterministic
- # $O packet division) and for all other packets.
- if sequence_entry.is_output_matcher():
- try:
- # Grab next entry from the output queue.
- content = pump.get_output(timeout_seconds)
- except queue.Empty:
- if logger:
- logger.warning(
- "timeout waiting for stub output (accumulated output:{})".format(
- pump.get_accumulated_output()))
- raise Exception(
- "timed out while waiting for output match (accumulated output: {})".format(
- pump.get_accumulated_output()))
+ # Grab the first sequence entry.
+ sequence_entry = test_sequence.entries.pop(0)
+
+ # While we have an active sequence entry, send messages
+ # destined for the stub and collect/match/process responses
+ # expected from the stub.
+ while sequence_entry:
+ if sequence_entry.is_send_to_remote():
+ # This is an entry to send to the remote debug monitor.
+ send_packet = sequence_entry.get_send_packet()
+ if logger:
+ if len(send_packet) == 1 and send_packet[0] == chr(3):
+ packet_desc = "^C"
else:
- try:
- content = pump.get_packet(timeout_seconds)
- except queue.Empty:
- if logger:
- logger.warning(
- "timeout waiting for packet match (receive buffer: {})".format(
- pump.get_receive_buffer()))
- raise Exception(
- "timed out while waiting for packet match (receive buffer: {})".format(
- pump.get_receive_buffer()))
-
- # Give the sequence entry the opportunity to match the content.
- # Output matchers might match or pass after more output accumulates.
- # Other packet types generally must match.
- asserter.assertIsNotNone(content)
- context = sequence_entry.assert_match(
- asserter, content, context=context)
-
- # Move on to next sequence entry as needed. Some sequence entries support executing multiple
- # times in
diff erent states (for looping over query/response
- # packets).
- if sequence_entry.is_consumed():
- if len(test_sequence.entries) > 0:
- sequence_entry = test_sequence.entries.pop(0)
+ packet_desc = send_packet
+ logger.info(
+ "sending packet to remote: {}".format(packet_desc))
+ server.send_raw(send_packet.encode())
+ else:
+ # This is an entry expecting to receive content from the remote
+ # debug monitor.
+
+ # We'll pull from (and wait on) the queue appropriate for the type of matcher.
+ # We keep separate queues for process output (coming from non-deterministic
+ # $O packet division) and for all other packets.
+ try:
+ if sequence_entry.is_output_matcher():
+ # Grab next entry from the output queue.
+ content = server.get_raw_output_packet()
else:
- sequence_entry = None
+ content = server.get_raw_normal_packet()
+ content = seven.bitcast_to_string(content)
+ except socket.timeout:
+ asserter.fail(
+ "timed out while waiting for '{}':\n{}".format(sequence_entry, server))
+
+ # Give the sequence entry the opportunity to match the content.
+ # Output matchers might match or pass after more output accumulates.
+ # Other packet types generally must match.
+ asserter.assertIsNotNone(content)
+ context = sequence_entry.assert_match(
+ asserter, content, context=context)
+
+ # Move on to next sequence entry as needed. Some sequence entries support executing multiple
+ # times in
diff erent states (for looping over query/response
+ # packets).
+ if sequence_entry.is_consumed():
+ if len(test_sequence.entries) > 0:
+ sequence_entry = test_sequence.entries.pop(0)
+ else:
+ sequence_entry = None
- # Fill in the O_content entries.
- context["O_count"] = 1
- context["O_content"] = pump.get_accumulated_output()
+ # Fill in the O_content entries.
+ context["O_count"] = 1
+ context["O_content"] = server.consume_accumulated_output()
return context
@@ -950,9 +933,99 @@ def process_is_running(pid, unknown_value=True):
# Check if the pid is in the process_ids
return pid in process_ids
-if __name__ == '__main__':
- EXE_PATH = get_lldb_server_exe()
- if EXE_PATH:
- print("lldb-server path detected: {}".format(EXE_PATH))
+def _handle_output_packet_string(packet_contents):
+ if (not packet_contents) or (len(packet_contents) < 1):
+ return None
+ elif packet_contents[0:1] != b"O":
+ return None
+ elif packet_contents == b"OK":
+ return None
else:
- print("lldb-server could not be found")
+ return binascii.unhexlify(packet_contents[1:])
+
+class Server(object):
+
+ _GDB_REMOTE_PACKET_REGEX = re.compile(br'^\$([^\#]*)#[0-9a-fA-F]{2}')
+
+ class ChecksumMismatch(Exception):
+ pass
+
+ def __init__(self, sock, proc = None):
+ self._accumulated_output = b""
+ self._receive_buffer = b""
+ self._normal_queue = []
+ self._output_queue = []
+ self._sock = sock
+ self._proc = proc
+
+ def send_raw(self, frame):
+ self._sock.sendall(frame)
+
+ def _read(self, q):
+ while not q:
+ new_bytes = self._sock.recv(4096)
+ self._process_new_bytes(new_bytes)
+ return q.pop(0)
+
+ def _process_new_bytes(self, new_bytes):
+ # Add new bytes to our accumulated unprocessed packet bytes.
+ self._receive_buffer += new_bytes
+
+ # Parse fully-formed packets into individual packets.
+ has_more = len(self._receive_buffer) > 0
+ while has_more:
+ if len(self._receive_buffer) <= 0:
+ has_more = False
+ # handle '+' ack
+ elif self._receive_buffer[0:1] == b"+":
+ self._normal_queue += [b"+"]
+ self._receive_buffer = self._receive_buffer[1:]
+ else:
+ packet_match = self._GDB_REMOTE_PACKET_REGEX.match(
+ self._receive_buffer)
+ if packet_match:
+ # Our receive buffer matches a packet at the
+ # start of the receive buffer.
+ new_output_content = _handle_output_packet_string(
+ packet_match.group(1))
+ if new_output_content:
+ # This was an $O packet with new content.
+ self._accumulated_output += new_output_content
+ self._output_queue += [self._accumulated_output]
+ else:
+ # Any packet other than $O.
+ self._normal_queue += [packet_match.group(0)]
+
+ # Remove the parsed packet from the receive
+ # buffer.
+ self._receive_buffer = self._receive_buffer[
+ len(packet_match.group(0)):]
+ else:
+ # We don't have enough in the receive bufferto make a full
+ # packet. Stop trying until we read more.
+ has_more = False
+
+ def get_raw_output_packet(self):
+ return self._read(self._output_queue)
+
+ def get_raw_normal_packet(self):
+ return self._read(self._normal_queue)
+
+ def get_accumulated_output(self):
+ return self._accumulated_output
+
+ def consume_accumulated_output(self):
+ output = self._accumulated_output
+ self._accumulated_output = b""
+ return output
+
+ def __str__(self):
+ return dedent("""\
+ server '{}' on '{}'
+ _receive_buffer: {}
+ _normal_queue: {}
+ _output_queue: {}
+ _accumulated_output: {}
+ """).format(self._proc, self._sock, self._receive_buffer,
+ self._normal_queue, self._output_queue,
+ self._accumulated_output)
diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py b/lldb/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py
deleted file mode 100644
index 6c41ed473b45..000000000000
--- a/lldb/packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py
+++ /dev/null
@@ -1,181 +0,0 @@
-
-from __future__ import print_function
-
-
-import re
-import select
-import threading
-import time
-import traceback
-
-from six.moves import queue
-from lldbsuite.support import seven
-
-
-def _handle_output_packet_string(packet_contents):
- if (not packet_contents) or (len(packet_contents) < 1):
- return None
- elif packet_contents[0] != "O":
- return None
- elif packet_contents == "OK":
- return None
- else:
- return seven.unhexlify(packet_contents[1:])
-
-
-def _dump_queue(the_queue):
- while not the_queue.empty():
- print(repr(the_queue.get(True)))
- print("\n")
-
-
-class PumpQueues(object):
-
- def __init__(self):
- self._output_queue = queue.Queue()
- self._packet_queue = queue.Queue()
-
- def output_queue(self):
- return self._output_queue
-
- def packet_queue(self):
- return self._packet_queue
-
- def verify_queues_empty(self):
- # Warn if there is any content left in any of the queues.
- # That would represent unmatched packets.
- if not self.output_queue().empty():
- print("warning: output queue entries still exist:")
- _dump_queue(self.output_queue())
- print("from here:")
- traceback.print_stack()
-
- if not self.packet_queue().empty():
- print("warning: packet queue entries still exist:")
- _dump_queue(self.packet_queue())
- print("from here:")
- traceback.print_stack()
-
-
-class SocketPacketPump(object):
- """A threaded packet reader that partitions packets into two streams.
-
- All incoming $O packet content is accumulated with the current accumulation
- state put into the OutputQueue.
-
- All other incoming packets are placed in the packet queue.
-
- A select thread can be started and stopped, and runs to place packet
- content into the two queues.
- """
-
- _GDB_REMOTE_PACKET_REGEX = re.compile(r'^\$([^\#]*)#[0-9a-fA-F]{2}')
-
- def __init__(self, pump_socket, pump_queues, logger=None):
- if not pump_socket:
- raise Exception("pump_socket cannot be None")
-
- self._socket = pump_socket
- self._logger = logger
- self._receive_buffer = ""
- self._accumulated_output = ""
- self._pump_queues = pump_queues
-
- def __enter__(self):
- self._receive_buffer = ""
- self._accumulated_output = ""
- return self
-
- def __exit__(self, exit_type, value, the_traceback):
- pass
-
- def _read(self, timeout_seconds, q):
- now = time.monotonic()
- deadline = now + timeout_seconds
- while q.empty() and now <= deadline:
- can_read, _, _ = select.select([self._socket], [], [], deadline-now)
- now = time.monotonic()
- if can_read and self._socket in can_read:
- try:
- new_bytes = seven.bitcast_to_string(self._socket.recv(4096))
- if self._logger and new_bytes and len(new_bytes) > 0:
- self._logger.debug(
- "pump received bytes: {}".format(new_bytes))
- except:
- # Likely a closed socket. Done with the pump thread.
- if self._logger:
- self._logger.debug(
- "socket read failed, stopping pump read thread\n" +
- traceback.format_exc(3))
- break
- self._process_new_bytes(new_bytes)
- if q.empty():
- raise queue.Empty()
- return q.get(True)
-
- def get_output(self, timeout_seconds):
- return self._read(timeout_seconds, self._pump_queues.output_queue())
-
- def get_packet(self, timeout_seconds):
- return self._read(timeout_seconds, self._pump_queues.packet_queue())
-
- def _process_new_bytes(self, new_bytes):
- if not new_bytes:
- return
- if len(new_bytes) < 1:
- return
-
- # Add new bytes to our accumulated unprocessed packet bytes.
- self._receive_buffer += new_bytes
-
- # Parse fully-formed packets into individual packets.
- has_more = len(self._receive_buffer) > 0
- while has_more:
- if len(self._receive_buffer) <= 0:
- has_more = False
- # handle '+' ack
- elif self._receive_buffer[0] == "+":
- self._pump_queues.packet_queue().put("+")
- self._receive_buffer = self._receive_buffer[1:]
- if self._logger:
- self._logger.debug(
- "parsed packet from stub: +\n" +
- "new receive_buffer: {}".format(
- self._receive_buffer))
- else:
- packet_match = self._GDB_REMOTE_PACKET_REGEX.match(
- self._receive_buffer)
- if packet_match:
- # Our receive buffer matches a packet at the
- # start of the receive buffer.
- new_output_content = _handle_output_packet_string(
- packet_match.group(1))
- if new_output_content:
- # This was an $O packet with new content.
- self._accumulated_output += new_output_content
- self._pump_queues.output_queue().put(self._accumulated_output)
- else:
- # Any packet other than $O.
- self._pump_queues.packet_queue().put(packet_match.group(0))
-
- # Remove the parsed packet from the receive
- # buffer.
- self._receive_buffer = self._receive_buffer[
- len(packet_match.group(0)):]
- if self._logger:
- self._logger.debug(
- "parsed packet from stub: " +
- packet_match.group(0))
- self._logger.debug(
- "new receive_buffer: " +
- self._receive_buffer)
- else:
- # We don't have enough in the receive bufferto make a full
- # packet. Stop trying until we read more.
- has_more = False
-
- def get_accumulated_output(self):
- return self._accumulated_output
-
- def get_receive_buffer(self):
- return self._receive_buffer
diff --git a/lldb/test/API/tools/lldb-server/TestGdbRemoteCompletion.py b/lldb/test/API/tools/lldb-server/TestGdbRemoteCompletion.py
index 94e628c811af..5eec3c44f67a 100644
--- a/lldb/test/API/tools/lldb-server/TestGdbRemoteCompletion.py
+++ b/lldb/test/API/tools/lldb-server/TestGdbRemoteCompletion.py
@@ -27,6 +27,7 @@ def init_lldb_server(self):
self.stub_hostname = "localhost"
self.port = int(lldbutil.wait_for_file_on_target(self, port_file))
self.sock = self.create_socket()
+ self._server = Server(self.sock, server)
self.add_no_ack_remote_stream()
More information about the lldb-commits
mailing list