[Lldb-commits] [lldb] [lldb-dap] Test Gardening, improving DebugCommunication. (PR #141689)
John Harrison via lldb-commits
lldb-commits at lists.llvm.org
Wed May 28 15:49:30 PDT 2025
https://github.com/ashgti updated https://github.com/llvm/llvm-project/pull/141689
>From ad0a9cd321d260cd87b852b335da9565f8326449 Mon Sep 17 00:00:00 2001
From: John Harrison <harjohn at google.com>
Date: Tue, 27 May 2025 16:40:10 -0700
Subject: [PATCH 1/2] [lldb-dap] Test Gardening, improving DebugCommunication.
Improving the readability and correctness of DebugCommunication by adding type annotations to many parts of the library and trying to improve the implementation of a few key areas of the code to better handle correctness.
Specifically, this refactored the `DebugCommunication._handle_recv_packet` function to ensure consistency with the reader thread when handling state changes and improved the `DebugCommunication._recv_packet` helper to make it easier to follow by adding some additional helpers.
---
.../test/tools/lldb-dap/dap_server.py | 557 ++++++++++--------
.../test/tools/lldb-dap/lldbdap_testcase.py | 16 +-
.../tools/lldb-dap/cancel/TestDAP_cancel.py | 27 +-
3 files changed, 340 insertions(+), 260 deletions(-)
diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
index a028381a0a4f9..cc235f4fe8b1a 100644
--- a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
+++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
@@ -12,13 +12,52 @@
import sys
import threading
import time
-from typing import Any, Optional, Union, BinaryIO, TextIO
+from typing import (
+ Any,
+ Optional,
+ Union,
+ BinaryIO,
+ TextIO,
+ TypedDict,
+ Literal,
+ Callable,
+ TypeVar,
+)
## DAP type references
-Event = dict[str, Any]
-Request = dict[str, Any]
-Response = dict[str, Any]
+
+
+class Event(TypedDict):
+ type: Literal["event"]
+ seq: Literal[0]
+ event: str
+ body: Optional[dict]
+
+
+class Request(TypedDict):
+ type: Literal["request"]
+ seq: int
+ command: str
+ arguments: Optional[dict]
+
+
+class Response(TypedDict):
+ type: Literal["response"]
+ seq: Literal[0]
+ request_seq: int
+ success: bool
+ command: str
+ message: Optional[str]
+ body: Optional[dict]
+
+
+_T = TypeVar("_T")
ProtocolMessage = Union[Event, Request, Response]
+# An internal type used for tracking protocol messages and an EOF sentinel
+# value. 'None' cannot easily be used as a sentinel because it is a falsey
+# value. When returned outside of DebugCommunication an EOFError is typically
+# converted into 'None'.
+_InternalProtocolMessage = Union[Event, Request, Response, EOFError]
def dump_memory(base_addr, data, num_per_line, outfile):
@@ -58,44 +97,42 @@ def dump_memory(base_addr, data, num_per_line, outfile):
outfile.write("\n")
-def read_packet(f, verbose=False, trace_file=None):
+def read_packet(
+ f: BinaryIO, verbose=False, trace_file=None
+) -> _InternalProtocolMessage:
"""Decode a JSON packet that starts with the content length and is
followed by the JSON bytes from a file 'f'. Returns None on EOF.
"""
- line = f.readline().decode("utf-8")
+ line = f.readline().decode()
if len(line) == 0:
- return None # EOF.
+ return EOFError() # EOF.
# Watch for line that starts with the prefix
prefix = "Content-Length: "
if line.startswith(prefix):
# Decode length of JSON bytes
if verbose:
- print('content: "%s"' % (line))
+ print("content:", line)
length = int(line[len(prefix) :])
if verbose:
- print('length: "%u"' % (length))
+ print("length:", length)
# Skip empty line
- line = f.readline()
+ line = f.readline().decode()
if verbose:
- print('empty: "%s"' % (line))
+ print("empty:", line)
# Read JSON bytes
- json_str = f.read(length)
+ json_str = f.read(length).decode()
if verbose:
- print('json: "%s"' % (json_str))
+ print("json:", json_str)
if trace_file:
- trace_file.write("from adapter:\n%s\n" % (json_str))
+ trace_file.write(f"from adapter:\n{json_str}\n")
# Decode the JSON bytes into a python dictionary
return json.loads(json_str)
raise Exception("unexpected malformed message from lldb-dap: " + line)
-def packet_type_is(packet, packet_type):
- return "type" in packet and packet["type"] == packet_type
-
-
-def dump_dap_log(log_file):
+def dump_dap_log(log_file: str):
print("========= DEBUG ADAPTER PROTOCOL LOGS =========", file=sys.stderr)
if log_file is None:
print("no log file available", file=sys.stderr)
@@ -124,8 +161,8 @@ def __init__(
def __str__(self):
return f"Source(name={self.name}, path={self.path}), source_reference={self.source_reference})"
- def as_dict(self):
- source_dict = {}
+ def as_dict(self) -> dict:
+ source_dict: dict[str, Any] = {}
if self._name is not None:
source_dict["name"] = self._name
if self._path is not None:
@@ -141,38 +178,42 @@ def __init__(
recv: BinaryIO,
send: BinaryIO,
init_commands: list[str],
- log_file: Optional[TextIO] = None,
+ log_file: Optional[str] = None,
):
# For debugging test failures, try setting `trace_file = sys.stderr`.
self.trace_file: Optional[TextIO] = None
self.log_file = log_file
self.send = send
self.recv = recv
- self.recv_packets: list[Optional[ProtocolMessage]] = []
+ self.recv_packets: list[_InternalProtocolMessage] = []
self.recv_condition = threading.Condition()
self.recv_thread = threading.Thread(target=self._read_packet_thread)
- self.process_event_body = None
self.exit_status: Optional[int] = None
+ self.init_commands = init_commands
self.initialize_body = None
+ self.initialized = False
+ self.configuration_done_sent = False
+ self.process_event_body: Optional[dict] = None
+ self.terminated = False
self.progress_events: list[Event] = []
- self.reverse_requests = []
+ self.reverse_requests: list[Request] = []
self.sequence = 1
- self.threads = None
- self.thread_stop_reasons = {}
- self.recv_thread.start()
self.output_condition = threading.Condition()
self.output: dict[str, list[str]] = {}
- self.configuration_done_sent = False
- self.initialized = False
- self.frame_scopes = {}
- self.init_commands = init_commands
+
+ # debuggee state
+ self.threads = None
+ self.thread_stop_reasons: dict[str, Any] = {}
+ self.frame_scopes: dict[str, Any] = {}
+
+ self.recv_thread.start()
@classmethod
def encode_content(cls, s: str) -> bytes:
return ("Content-Length: %u\r\n\r\n%s" % (len(s), s)).encode("utf-8")
@classmethod
- def validate_response(cls, command, response):
+ def validate_response(cls, command: Request, response: Response):
if command["command"] != response["command"]:
raise ValueError("command mismatch in response")
if command["seq"] != response["request_seq"]:
@@ -224,84 +265,91 @@ def collect_output(self, category, timeout_secs, pattern, clear=True):
break
return collected_output if collected_output else None
- def _enqueue_recv_packet(self, packet: Optional[ProtocolMessage]):
- self.recv_condition.acquire()
+ def _enqueue_recv_packet(self, packet: Union[ProtocolMessage, EOFError]):
self.recv_packets.append(packet)
self.recv_condition.notify()
- self.recv_condition.release()
- def _handle_recv_packet(self, packet: Optional[ProtocolMessage]) -> bool:
+ def _handle_recv_packet(self, packet: _InternalProtocolMessage) -> bool:
"""Called by the read thread that is waiting for all incoming packets
to store the incoming packet in "self.recv_packets" in a thread safe
way. This function will then signal the "self.recv_condition" to
indicate a new packet is available. Returns True if the caller
should keep calling this function for more packets.
"""
- # If EOF, notify the read thread by enqueuing a None.
- if not packet:
- self._enqueue_recv_packet(None)
- return False
-
- # Check the packet to see if is an event packet
- keepGoing = True
- packet_type = packet["type"]
- if packet_type == "event":
- event = packet["event"]
- body = None
- if "body" in packet:
- body = packet["body"]
- # Handle the event packet and cache information from these packets
- # as they come in
- if event == "output":
- # Store any output we receive so clients can retrieve it later.
- category = body["category"]
- output = body["output"]
- self.output_condition.acquire()
- if category in self.output:
- self.output[category] += output
- else:
- self.output[category] = output
- self.output_condition.notify()
- self.output_condition.release()
- # no need to add 'output' event packets to our packets list
- return keepGoing
- elif event == "initialized":
- self.initialized = True
- elif event == "process":
- # When a new process is attached or launched, remember the
- # details that are available in the body of the event
- self.process_event_body = body
- elif event == "exited":
- # Process exited, mark the status to indicate the process is not
- # alive.
- self.exit_status = body["exitCode"]
- elif event == "continued":
- # When the process continues, clear the known threads and
- # thread_stop_reasons.
- all_threads_continued = body.get("allThreadsContinued", True)
- tid = body["threadId"]
- if tid in self.thread_stop_reasons:
- del self.thread_stop_reasons[tid]
- self._process_continued(all_threads_continued)
- elif event == "stopped":
- # Each thread that stops with a reason will send a
- # 'stopped' event. We need to remember the thread stop
- # reasons since the 'threads' command doesn't return
- # that information.
- self._process_stopped()
- tid = body["threadId"]
- self.thread_stop_reasons[tid] = body
- elif event.startswith("progress"):
- # Progress events come in as 'progressStart', 'progressUpdate',
- # and 'progressEnd' events. Keep these around in case test
- # cases want to verify them.
- self.progress_events.append(packet)
-
- elif packet_type == "response":
- if packet["command"] == "disconnect":
- keepGoing = False
- self._enqueue_recv_packet(packet)
- return keepGoing
+ # Hold the recv_condition for consistency of debugger state.
+ with self.recv_condition:
+ if isinstance(packet, EOFError):
+ self._enqueue_recv_packet(packet)
+ return False
+
+ keep_going = True
+
+ # Check the packet to see if is an event packet
+ if packet["type"] == "event" and "event" in packet:
+ event = packet["event"]
+ body = packet.get("body")
+ # Handle the event packet and cache DAP stateful information from
+ # these packets as they come in.
+ if event == "output" and body is not None:
+ # Store any output we receive so clients can retrieve it later.
+ category = body["category"]
+ output = body["output"]
+ self.output_condition.acquire()
+ if category in self.output:
+ self.output[category] += output
+ else:
+ self.output[category] = output
+ self.output_condition.notify()
+ self.output_condition.release()
+ # no need to add 'output' event packets to our packets list
+ return keep_going
+ elif event == "initialized":
+ self.initialized = True
+ elif event == "process" and body is not None:
+ # When a new process is attached or launched, remember the
+ # details that are available in the body of the event
+ self.process_event_body = body
+ elif event == "terminated":
+ # If we get the 'terminated' event then lldb-dap has exited
+ # itself.
+ self.terminated = True
+ elif event == "exited" and body is not None:
+ # Process exited, mark the status to indicate the process is not
+ # alive.
+ self.exit_status = body.get("exitCode", 0)
+ elif event == "continued" and body is not None:
+ # When the process continues, clear the known threads and
+ # thread_stop_reasons.
+ all_threads_continued = body.get("allThreadsContinued", True)
+ tid = body["threadId"]
+ if tid in self.thread_stop_reasons:
+ del self.thread_stop_reasons[tid]
+ self._process_continued(all_threads_continued)
+ elif event == "stopped" and body is not None:
+ # Each thread that stops with a reason will send a
+ # 'stopped' event. We need to remember the thread stop
+ # reasons since the 'threads' command doesn't return
+ # that information.
+ self._process_stopped()
+ tid = body["threadId"]
+ self.thread_stop_reasons[tid] = body
+ elif event.startswith("progress"):
+ # Progress events come in as 'progressStart', 'progressUpdate',
+ # and 'progressEnd' events. Keep these around in case test
+ # cases want to verify them.
+ self.progress_events.append(packet)
+
+ elif packet["type"] == "response":
+ if packet["command"] == "disconnect":
+ keep_going = False
+
+ elif packet["type"] == "request":
+ # Handle reverse requests and keep processing.
+ self._handle_reverse_request(packet)
+ return keep_going
+
+ self._enqueue_recv_packet(packet)
+ return keep_going
def _process_continued(self, all_threads_continued: bool):
self.threads = None
@@ -309,14 +357,63 @@ def _process_continued(self, all_threads_continued: bool):
if all_threads_continued:
self.thread_stop_reasons = {}
- def send_packet(self, command_dict: Request, set_sequence=True):
+ def _handle_reverse_request(self, request: Request):
+ self.reverse_requests.append(request)
+ arguments = request.get("arguments")
+ if request["command"] == "runInTerminal" and arguments is not None:
+ in_shell = arguments.get("argsCanBeInterpretedByShell", False)
+ proc = subprocess.Popen(
+ arguments["args"],
+ env=arguments.get("env", {}),
+ cwd=arguments["cwd"],
+ stdin=subprocess.DEVNULL,
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ shell=in_shell,
+ )
+ body = {}
+ if in_shell:
+ body["shellProcessId"] = proc.pid
+ else:
+ body["processId"] = proc.pid
+ self.send_packet(
+ {
+ "type": "response",
+ "seq": 0,
+ "request_seq": request["seq"],
+ "success": True,
+ "command": "runInTerminal",
+ "message": None,
+ "body": body,
+ }
+ )
+ elif request["command"] == "startDebugging":
+ self.send_packet(
+ {
+ "type": "response",
+ "seq": 0,
+ "request_seq": request["seq"],
+ "success": True,
+ "message": None,
+ "command": "startDebugging",
+ "body": {},
+ }
+ )
+ else:
+ desc = 'unknown reverse request "%s"' % (request["command"])
+ raise ValueError(desc)
+
+ def send_packet(self, command_dict: ProtocolMessage) -> int:
"""Take the "command_dict" python dictionary and encode it as a JSON
string and send the contents as a packet to the VSCode debug
- adapter"""
+ adapter."""
+ seq = 0
# Set the sequence ID for this command automatically
- if set_sequence:
- command_dict["seq"] = self.sequence
+ if command_dict["type"] == "request":
+ seq = command_dict["seq"] = self.sequence
self.sequence += 1
+ else:
+ command_dict["seq"] = 0
# Encode our command dictionary as a JSON string
json_str = json.dumps(command_dict, separators=(",", ":"))
if self.trace_file:
@@ -326,104 +423,70 @@ def send_packet(self, command_dict: Request, set_sequence=True):
# Send the encoded JSON packet and flush the 'send' file
self.send.write(self.encode_content(json_str))
self.send.flush()
+ return seq
- def recv_packet(
+ def receive_response(
self,
- filter_type: Optional[str] = None,
- filter_event: Optional[Union[str, list[str]]] = None,
+ seq: int,
+ ) -> Optional[Response]:
+ """Waits for the a response with the associated request_sec."""
+
+ def predicate(p: Response):
+ return p["type"] == "response" and p["request_seq"] == seq
+
+ return self._recv_packet(predicate=predicate)
+
+ def _recv_packet(
+ self,
+ *,
+ predicate: Callable[[_T], bool],
timeout: Optional[float] = None,
- ) -> Optional[ProtocolMessage]:
+ ) -> Optional[_T]:
"""Get a JSON packet from the VSCode debug adapter. This function
assumes a thread that reads packets is running and will deliver
any received packets by calling handle_recv_packet(...). This
function will wait for the packet to arrive and return it when
it does."""
- while True:
- try:
- self.recv_condition.acquire()
- packet = None
- while True:
- for i, curr_packet in enumerate(self.recv_packets):
- if not curr_packet:
- raise EOFError
- packet_type = curr_packet["type"]
- if filter_type is None or packet_type in filter_type:
- if filter_event is None or (
- packet_type == "event"
- and curr_packet["event"] in filter_event
- ):
- packet = self.recv_packets.pop(i)
- break
- if packet:
- break
- # Sleep until packet is received
- len_before = len(self.recv_packets)
- self.recv_condition.wait(timeout)
- len_after = len(self.recv_packets)
- if len_before == len_after:
- return None # Timed out
- return packet
- except EOFError:
+ with self.recv_condition:
+
+ def _predicate():
+ return next(
+ filter(
+ lambda p: isinstance(p, EOFError) or predicate(p),
+ self.recv_packets,
+ ),
+ None,
+ )
+
+ packet = self.recv_condition.wait_for(_predicate, timeout=timeout)
+ if packet is None: # Timeout
+ return None
+ self.recv_packets.remove(packet)
+ if isinstance(packet, EOFError):
return None
- finally:
- self.recv_condition.release()
+ return packet
- def send_recv(self, command):
+ def _send_recv(self, command: Request) -> Optional[Response]:
"""Send a command python dictionary as JSON and receive the JSON
response. Validates that the response is the correct sequence and
- command in the reply. Any events that are received are added to the
- events list in this object"""
- self.send_packet(command)
- done = False
- while not done:
- response_or_request = self.recv_packet(filter_type=["response", "request"])
- if response_or_request is None:
- desc = 'no response for "%s"' % (command["command"])
- raise ValueError(desc)
- if response_or_request["type"] == "response":
- self.validate_response(command, response_or_request)
- return response_or_request
- else:
- self.reverse_requests.append(response_or_request)
- if response_or_request["command"] == "runInTerminal":
- subprocess.Popen(
- response_or_request["arguments"]["args"],
- env=response_or_request["arguments"]["env"],
- )
- self.send_packet(
- {
- "type": "response",
- "request_seq": response_or_request["seq"],
- "success": True,
- "command": "runInTerminal",
- "body": {},
- },
- )
- elif response_or_request["command"] == "startDebugging":
- self.send_packet(
- {
- "type": "response",
- "request_seq": response_or_request["seq"],
- "success": True,
- "command": "startDebugging",
- "body": {},
- },
- )
- else:
- desc = 'unknown reverse request "%s"' % (
- response_or_request["command"]
- )
- raise ValueError(desc)
-
- return None
+ command in the reply."""
+ seq = self.send_packet(command)
+ response = self.receive_response(seq)
+ if response is None:
+ desc = 'no response for "%s"' % (command["command"])
+ raise ValueError(desc)
+ self.validate_response(command, response)
+ return response
def wait_for_event(
- self, filter: Union[str, list[str]], timeout: Optional[float] = None
+ self, filter: list[str] = [], timeout: Optional[float] = None
) -> Optional[Event]:
"""Wait for the first event that matches the filter."""
- return self.recv_packet(
- filter_type="event", filter_event=filter, timeout=timeout
- )
+
+ def predicate(p: Event):
+ return p["type"] == "event" and p["event"] in filter
+
+ return self._recv_packet(predicate=predicate, timeout=timeout)
def wait_for_stopped(
self, timeout: Optional[float] = None
@@ -448,20 +511,22 @@ def wait_for_stopped(
def wait_for_breakpoint_events(self, timeout: Optional[float] = None):
breakpoint_events: list[Event] = []
while True:
- event = self.wait_for_event("breakpoint", timeout=timeout)
+ event = self.wait_for_event(["breakpoint"], timeout=timeout)
if not event:
break
breakpoint_events.append(event)
return breakpoint_events
def wait_for_exited(self, timeout: Optional[float] = None):
- event_dict = self.wait_for_event("exited", timeout=timeout)
+ event_dict = self.wait_for_event(["exited"], timeout=timeout)
if event_dict is None:
raise ValueError("didn't get exited event")
return event_dict
def wait_for_terminated(self, timeout: Optional[float] = None):
- event_dict = self.wait_for_event("terminated", timeout)
+ if self.terminated:
+ raise ValueError("already terminated")
+ event_dict = self.wait_for_event(["terminated"], timeout)
if event_dict is None:
raise ValueError("didn't get terminated event")
return event_dict
@@ -600,7 +665,7 @@ def replay_packets(self, replay_file_path):
raise ValueError("decode packet failed from replay file")
print("Sending:")
pprint.PrettyPrinter(indent=2).pprint(command_dict)
- # raw_input('Press ENTER to send:')
+ # input('Press ENTER to send:')
self.send_packet(command_dict, set_sequence)
mode = "invalid"
elif mode == "recv":
@@ -639,7 +704,7 @@ def request_attach(
gdbRemotePort: Optional[int] = None,
gdbRemoteHostname: Optional[str] = None,
):
- args_dict = {}
+ args_dict: dict[str, Any] = {}
if pid is not None:
args_dict["pid"] = pid
if program is not None:
@@ -671,8 +736,13 @@ def request_attach(
args_dict["gdb-remote-port"] = gdbRemotePort
if gdbRemoteHostname is not None:
args_dict["gdb-remote-hostname"] = gdbRemoteHostname
- command_dict = {"command": "attach", "type": "request", "arguments": args_dict}
- return self.send_recv(command_dict)
+ command_dict: Request = {
+ "command": "attach",
+ "type": "request",
+ "seq": 0,
+ "arguments": args_dict,
+ }
+ return self._send_recv(command_dict)
def request_breakpointLocations(
self, file_path, line, end_line=None, column=None, end_column=None
@@ -694,7 +764,7 @@ def request_breakpointLocations(
"type": "request",
"arguments": args_dict,
}
- return self.send_recv(command_dict)
+ return self._send_recv(command_dict)
def request_configurationDone(self):
command_dict = {
@@ -702,7 +772,7 @@ def request_configurationDone(self):
"type": "request",
"arguments": {},
}
- response = self.send_recv(command_dict)
+ response = self._send_recv(command_dict)
if response:
self.configuration_done_sent = True
self.request_threads()
@@ -731,7 +801,7 @@ def request_continue(self, threadId=None, singleThread=False):
"type": "request",
"arguments": args_dict,
}
- response = self.send_recv(command_dict)
+ response = self._send_recv(command_dict)
if response["success"]:
self._process_continued(response["body"]["allThreadsContinued"])
# Caller must still call wait_for_stopped.
@@ -745,23 +815,24 @@ def request_restart(self, restartArguments=None):
if restartArguments:
command_dict["arguments"] = restartArguments
- response = self.send_recv(command_dict)
+ response = self._send_recv(command_dict)
# Caller must still call wait_for_stopped.
return response
- def request_disconnect(self, terminateDebuggee=None):
+ def request_disconnect(
+ self,
+ terminateDebuggee: Optional[bool] = None,
+ ):
args_dict = {}
if terminateDebuggee is not None:
- if terminateDebuggee:
- args_dict["terminateDebuggee"] = True
- else:
- args_dict["terminateDebuggee"] = False
- command_dict = {
+ args_dict["terminateDebuggee"] = terminateDebuggee
+ command_dict: Request = {
"command": "disconnect",
"type": "request",
+ "seq": 0,
"arguments": args_dict,
}
- return self.send_recv(command_dict)
+ return self._send_recv(command_dict)
def request_disassemble(
self,
@@ -781,7 +852,7 @@ def request_disassemble(
"type": "request",
"arguments": args_dict,
}
- return self.send_recv(command_dict)["body"]["instructions"]
+ return self._send_recv(command_dict)["body"]["instructions"]
def request_readMemory(self, memoryReference, offset, count):
args_dict = {
@@ -794,7 +865,7 @@ def request_readMemory(self, memoryReference, offset, count):
"type": "request",
"arguments": args_dict,
}
- return self.send_recv(command_dict)
+ return self._send_recv(command_dict)
def request_evaluate(self, expression, frameIndex=0, threadId=None, context=None):
stackFrame = self.get_stackFrame(frameIndex=frameIndex, threadId=threadId)
@@ -810,7 +881,7 @@ def request_evaluate(self, expression, frameIndex=0, threadId=None, context=None
"type": "request",
"arguments": args_dict,
}
- return self.send_recv(command_dict)
+ return self._send_recv(command_dict)
def request_exceptionInfo(self, threadId=None):
if threadId is None:
@@ -821,7 +892,7 @@ def request_exceptionInfo(self, threadId=None):
"type": "request",
"arguments": args_dict,
}
- return self.send_recv(command_dict)
+ return self._send_recv(command_dict)
def request_initialize(self, sourceInitFile=False):
command_dict = {
@@ -842,7 +913,7 @@ def request_initialize(self, sourceInitFile=False):
"$__lldb_sourceInitFile": sourceInitFile,
},
}
- response = self.send_recv(command_dict)
+ response = self._send_recv(command_dict)
if response:
if "body" in response:
self.initialize_body = response["body"]
@@ -877,7 +948,7 @@ def request_launch(
customFrameFormat: Optional[str] = None,
customThreadFormat: Optional[str] = None,
):
- args_dict = {"program": program}
+ args_dict: dict[str, Any] = {"program": program}
if args:
args_dict["args"] = args
if cwd:
@@ -924,15 +995,20 @@ def request_launch(
args_dict["displayExtendedBacktrace"] = displayExtendedBacktrace
if commandEscapePrefix is not None:
args_dict["commandEscapePrefix"] = commandEscapePrefix
- command_dict = {"command": "launch", "type": "request", "arguments": args_dict}
- return self.send_recv(command_dict)
+ command_dict: Request = {
+ "type": "request",
+ "seq": 0,
+ "command": "launch",
+ "arguments": args_dict,
+ }
+ return self._send_recv(command_dict)
def request_next(self, threadId, granularity="statement"):
if self.exit_status is not None:
raise ValueError("request_continue called after process exited")
args_dict = {"threadId": threadId, "granularity": granularity}
command_dict = {"command": "next", "type": "request", "arguments": args_dict}
- return self.send_recv(command_dict)
+ return self._send_recv(command_dict)
def request_stepIn(self, threadId, targetId, granularity="statement"):
if self.exit_status is not None:
@@ -945,7 +1021,7 @@ def request_stepIn(self, threadId, targetId, granularity="statement"):
"granularity": granularity,
}
command_dict = {"command": "stepIn", "type": "request", "arguments": args_dict}
- return self.send_recv(command_dict)
+ return self._send_recv(command_dict)
def request_stepInTargets(self, frameId):
if self.exit_status is not None:
@@ -956,14 +1032,14 @@ def request_stepInTargets(self, frameId):
"type": "request",
"arguments": args_dict,
}
- return self.send_recv(command_dict)
+ return self._send_recv(command_dict)
def request_stepOut(self, threadId):
if self.exit_status is not None:
raise ValueError("request_stepOut called after process exited")
args_dict = {"threadId": threadId}
command_dict = {"command": "stepOut", "type": "request", "arguments": args_dict}
- return self.send_recv(command_dict)
+ return self._send_recv(command_dict)
def request_pause(self, threadId=None):
if self.exit_status is not None:
@@ -972,12 +1048,12 @@ def request_pause(self, threadId=None):
threadId = self.get_thread_id()
args_dict = {"threadId": threadId}
command_dict = {"command": "pause", "type": "request", "arguments": args_dict}
- return self.send_recv(command_dict)
+ return self._send_recv(command_dict)
def request_scopes(self, frameId):
args_dict = {"frameId": frameId}
command_dict = {"command": "scopes", "type": "request", "arguments": args_dict}
- return self.send_recv(command_dict)
+ return self._send_recv(command_dict)
def request_setBreakpoints(self, source: Source, line_array, data=None):
"""data is array of parameters for breakpoints in line_array.
@@ -1008,12 +1084,13 @@ def request_setBreakpoints(self, source: Source, line_array, data=None):
breakpoints.append(bp)
args_dict["breakpoints"] = breakpoints
- command_dict = {
+ command_dict: Request = {
"command": "setBreakpoints",
"type": "request",
+ "seq": 0,
"arguments": args_dict,
}
- return self.send_recv(command_dict)
+ return self._send_recv(command_dict)
def request_setExceptionBreakpoints(self, filters):
args_dict = {"filters": filters}
@@ -1022,7 +1099,7 @@ def request_setExceptionBreakpoints(self, filters):
"type": "request",
"arguments": args_dict,
}
- return self.send_recv(command_dict)
+ return self._send_recv(command_dict)
def request_setFunctionBreakpoints(self, names, condition=None, hitCondition=None):
breakpoints = []
@@ -1039,7 +1116,7 @@ def request_setFunctionBreakpoints(self, names, condition=None, hitCondition=Non
"type": "request",
"arguments": args_dict,
}
- return self.send_recv(command_dict)
+ return self._send_recv(command_dict)
def request_dataBreakpointInfo(
self, variablesReference, name, frameIndex=0, threadId=None
@@ -1057,7 +1134,7 @@ def request_dataBreakpointInfo(
"type": "request",
"arguments": args_dict,
}
- return self.send_recv(command_dict)
+ return self._send_recv(command_dict)
def request_setDataBreakpoint(self, dataBreakpoints):
"""dataBreakpoints is a list of dictionary with following fields:
@@ -1074,7 +1151,7 @@ def request_setDataBreakpoint(self, dataBreakpoints):
"type": "request",
"arguments": args_dict,
}
- return self.send_recv(command_dict)
+ return self._send_recv(command_dict)
def request_compileUnits(self, moduleId):
args_dict = {"moduleId": moduleId}
@@ -1083,7 +1160,7 @@ def request_compileUnits(self, moduleId):
"type": "request",
"arguments": args_dict,
}
- response = self.send_recv(command_dict)
+ response = self._send_recv(command_dict)
return response
def request_completions(self, text, frameId=None):
@@ -1095,10 +1172,10 @@ def request_completions(self, text, frameId=None):
"type": "request",
"arguments": args_dict,
}
- return self.send_recv(command_dict)
+ return self._send_recv(command_dict)
def request_modules(self):
- return self.send_recv({"command": "modules", "type": "request"})
+ return self._send_recv({"command": "modules", "type": "request"})
def request_stackTrace(
self, threadId=None, startFrame=None, levels=None, format=None, dump=False
@@ -1117,7 +1194,7 @@ def request_stackTrace(
"type": "request",
"arguments": args_dict,
}
- response = self.send_recv(command_dict)
+ response = self._send_recv(command_dict)
if dump:
for idx, frame in enumerate(response["body"]["stackFrames"]):
name = frame["name"]
@@ -1143,7 +1220,7 @@ def request_source(self, sourceReference):
"sourceReference": sourceReference,
},
}
- return self.send_recv(command_dict)
+ return self._send_recv(command_dict)
def request_threads(self):
"""Request a list of all threads and combine any information from any
@@ -1151,7 +1228,7 @@ def request_threads(self):
thread actually stopped. Returns an array of thread dictionaries
with information about all threads"""
command_dict = {"command": "threads", "type": "request", "arguments": {}}
- response = self.send_recv(command_dict)
+ response = self._send_recv(command_dict)
body = response["body"]
# Fill in "self.threads" correctly so that clients that call
# self.get_threads() or self.get_thread_id(...) can get information
@@ -1188,7 +1265,7 @@ def request_variables(
"type": "request",
"arguments": args_dict,
}
- return self.send_recv(command_dict)
+ return self._send_recv(command_dict)
def request_setVariable(self, containingVarRef, name, value, id=None):
args_dict = {
@@ -1203,7 +1280,7 @@ def request_setVariable(self, containingVarRef, name, value, id=None):
"type": "request",
"arguments": args_dict,
}
- return self.send_recv(command_dict)
+ return self._send_recv(command_dict)
def request_locations(self, locationReference):
args_dict = {
@@ -1214,7 +1291,7 @@ def request_locations(self, locationReference):
"type": "request",
"arguments": args_dict,
}
- return self.send_recv(command_dict)
+ return self._send_recv(command_dict)
def request_testGetTargetBreakpoints(self):
"""A request packet used in the LLDB test suite to get all currently
@@ -1226,7 +1303,7 @@ def request_testGetTargetBreakpoints(self):
"type": "request",
"arguments": {},
}
- return self.send_recv(command_dict)
+ return self._send_recv(command_dict)
def terminate(self):
self.send.close()
@@ -1246,7 +1323,7 @@ def request_setInstructionBreakpoints(self, memory_reference=[]):
"type": "request",
"arguments": args_dict,
}
- return self.send_recv(command_dict)
+ return self._send_recv(command_dict)
class DebugAdapterServer(DebugCommunication):
@@ -1255,7 +1332,7 @@ def __init__(
executable: Optional[str] = None,
connection: Optional[str] = None,
init_commands: list[str] = [],
- log_file: Optional[TextIO] = None,
+ log_file: Optional[str] = None,
env: Optional[dict[str, str]] = None,
):
self.process = None
@@ -1293,7 +1370,7 @@ def launch(
*,
executable: str,
env: Optional[dict[str, str]] = None,
- log_file: Optional[TextIO] = None,
+ log_file: Optional[str] = None,
connection: Optional[str] = None,
) -> tuple[subprocess.Popen, Optional[str]]:
adapter_env = os.environ.copy()
@@ -1696,7 +1773,7 @@ def main():
executable=options.vscode_path, connection=options.connection
)
if options.debug:
- raw_input('Waiting for debugger to attach pid "%i"' % (dbg.get_pid()))
+ input('Waiting for debugger to attach pid "%i"' % (dbg.get_pid()))
if options.replay:
dbg.replay_packets(options.replay)
else:
diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py
index 91ae55977046b..1ce640bd5a631 100644
--- a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py
+++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py
@@ -3,8 +3,7 @@
from typing import Optional
import uuid
-import dap_server
-from dap_server import Source
+from dap_server import DebugAdapterServer, Source, Response
from lldbsuite.test.lldbtest import *
from lldbsuite.test import lldbplatformutil
import lldbgdbserverutils
@@ -26,7 +25,7 @@ def create_debug_adapter(
is_exe(self.lldbDAPExec), "lldb-dap must exist and be executable"
)
log_file_path = self.getBuildArtifact("dap.txt")
- self.dap_server = dap_server.DebugAdapterServer(
+ self.dap_server = DebugAdapterServer(
executable=self.lldbDAPExec,
connection=connection,
init_commands=self.setUpCommands(),
@@ -104,6 +103,17 @@ def waitUntil(self, condition_callback):
time.sleep(0.5)
return False
+ def assertResponseSuccess(self, response: Response):
+ self.assertIsNotNone(response)
+ if not response.get("success", False):
+ cmd = response.get("command", "<not set>")
+ msg = f"command ({cmd}) failed"
+ if "message" in response:
+ msg += " " + str(response["message"])
+ if "body" in response and response["body"] and "error" in response["body"]:
+ msg += " " + str(response["body"]["error"]["format"])
+ self.fail(msg)
+
def verify_breakpoint_hit(self, breakpoint_ids, timeout=DEFAULT_TIMEOUT):
"""Wait for the process we are debugging to stop, and verify we hit
any breakpoint location in the "breakpoint_ids" array.
diff --git a/lldb/test/API/tools/lldb-dap/cancel/TestDAP_cancel.py b/lldb/test/API/tools/lldb-dap/cancel/TestDAP_cancel.py
index 824ed8fe3bb97..c183a465d53f2 100644
--- a/lldb/test/API/tools/lldb-dap/cancel/TestDAP_cancel.py
+++ b/lldb/test/API/tools/lldb-dap/cancel/TestDAP_cancel.py
@@ -11,15 +11,14 @@
class TestDAP_cancel(lldbdap_testcase.DAPTestCaseBase):
def send_async_req(self, command: str, arguments={}) -> int:
- seq = self.dap_server.sequence
- self.dap_server.send_packet(
+ return self.dap_server.send_packet(
{
"type": "request",
+ "seq": 0,
"command": command,
"arguments": arguments,
}
)
- return seq
def async_blocking_request(self, duration: float) -> int:
"""
@@ -54,21 +53,17 @@ def test_pending_request(self):
pending_seq = self.async_blocking_request(duration=self.DEFAULT_TIMEOUT / 2)
cancel_seq = self.async_cancel(requestId=pending_seq)
- blocking_resp = self.dap_server.recv_packet(filter_type=["response"])
- self.assertEqual(blocking_resp["request_seq"], blocking_seq)
- self.assertEqual(blocking_resp["command"], "evaluate")
- self.assertEqual(blocking_resp["success"], True)
+ blocking_resp = self.dap_server.receive_response(blocking_seq)
+ self.assertResponseSuccess(blocking_resp)
- pending_resp = self.dap_server.recv_packet(filter_type=["response"])
+ pending_resp = self.dap_server.receive_response(pending_seq)
self.assertEqual(pending_resp["request_seq"], pending_seq)
self.assertEqual(pending_resp["command"], "evaluate")
self.assertEqual(pending_resp["success"], False)
self.assertEqual(pending_resp["message"], "cancelled")
- cancel_resp = self.dap_server.recv_packet(filter_type=["response"])
- self.assertEqual(cancel_resp["request_seq"], cancel_seq)
- self.assertEqual(cancel_resp["command"], "cancel")
- self.assertEqual(cancel_resp["success"], True)
+ cancel_resp = self.dap_server.receive_response(cancel_seq)
+ self.assertResponseSuccess(cancel_resp)
self.continue_to_exit()
def test_inflight_request(self):
@@ -86,14 +81,12 @@ def test_inflight_request(self):
)
cancel_seq = self.async_cancel(requestId=blocking_seq)
- blocking_resp = self.dap_server.recv_packet(filter_type=["response"])
+ blocking_resp = self.dap_server.receive_response(blocking_seq)
self.assertEqual(blocking_resp["request_seq"], blocking_seq)
self.assertEqual(blocking_resp["command"], "evaluate")
self.assertEqual(blocking_resp["success"], False)
self.assertEqual(blocking_resp["message"], "cancelled")
- cancel_resp = self.dap_server.recv_packet(filter_type=["response"])
- self.assertEqual(cancel_resp["request_seq"], cancel_seq)
- self.assertEqual(cancel_resp["command"], "cancel")
- self.assertEqual(cancel_resp["success"], True)
+ cancel_resp = self.dap_server.receive_response(cancel_seq)
+ self.assertResponseSuccess(cancel_resp)
self.continue_to_exit()
>From 57b9ce596e683b92855b95d1755a5bf1ba12ae77 Mon Sep 17 00:00:00 2001
From: John Harrison <harjohn at google.com>
Date: Wed, 28 May 2025 13:27:36 -0700
Subject: [PATCH 2/2] Addressing review feedback and tweaking the `**kwargs`
params to be typed.
To validate the types:
```
$ pip3 install mypy
$ mypy --python-version 3.8 \
lldb/packages/Python/lldbsuite/**/*.py \
lldb/test/API/tools/lldb-dap/**/*.py
```
---
.../test/tools/lldb-dap/dap_server.py | 317 ++++++++----------
.../test/tools/lldb-dap/lldbdap_testcase.py | 42 ++-
.../lldb-dap/commands/TestDAP_commands.py | 8 +-
.../console/TestDAP_redirection_to_console.py | 2 +-
.../lldb-dap/variables/TestDAP_variables.py | 6 +-
5 files changed, 167 insertions(+), 208 deletions(-)
diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
index cc235f4fe8b1a..70ad3dfe1970e 100644
--- a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
+++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
@@ -13,48 +13,97 @@
import threading
import time
from typing import (
+ IO,
Any,
Optional,
Union,
- BinaryIO,
+ List, # required for python 3.8 compatibility
+ Dict, # required for python 3.8 compatibility
+ Tuple, # required for python 3.8 compatibility
TextIO,
TypedDict,
Literal,
Callable,
TypeVar,
+ Generic,
+ cast,
)
+from typing_extensions import Unpack
## DAP type references
+T = TypeVar("T")
-class Event(TypedDict):
+
+class Event(TypedDict, Generic[T]):
type: Literal["event"]
seq: Literal[0]
event: str
- body: Optional[dict]
+ body: Optional[T]
-class Request(TypedDict):
+class Request(TypedDict, Generic[T]):
type: Literal["request"]
seq: int
command: str
- arguments: Optional[dict]
+ arguments: Optional[T]
-class Response(TypedDict):
+class Response(TypedDict, Generic[T]):
type: Literal["response"]
seq: Literal[0]
request_seq: int
success: bool
command: str
message: Optional[str]
- body: Optional[dict]
+ body: Optional[T]
+
+
+class AttachOrLaunchArguments(TypedDict, total=False):
+ stopOnEntry: bool
+ disableASLR: bool
+ disableSTDIO: bool
+ enableAutoVariableSummaries: bool
+ displayExtendedBacktrace: bool
+ enableSyntheticChildDebugging: bool
+ initCommands: List[str]
+ preRunCommands: List[str]
+ postRunCommands: List[str]
+ stopCommands: List[str]
+ exitCommands: List[str]
+ terminateCommands: List[str]
+ sourceMap: Union[List[Tuple[str, str]], Dict[str, str]]
+ sourcePath: str
+ debuggerRoot: str
+ commandEscapePrefix: str
+ customFrameFormat: str
+ customThreadFormat: str
+
+
+class LaunchArguments(AttachOrLaunchArguments, total=False):
+ program: str
+ args: List[str]
+ cwd: str
+ env: Dict[str, str]
+ shellExpandArguments: bool
+ runInTerminal: bool
+ launchCommands: List[str]
+
+
+class AttachArguments(AttachOrLaunchArguments, total=False):
+ program: str
+ pid: int
+ waitFor: bool
+ attachCommands: List[str]
+ coreFile: str
+ gdbRemotePort: int
+ gdbRemoteHostname: str
-_T = TypeVar("_T")
ProtocolMessage = Union[Event, Request, Response]
+
# An internal type used for tracking protocol messages and an EOF sentinel
-# value. 'None' cannot easily be used as a sentinel because it is a falsey
+# value. 'None' cannot easily be used as a sentinel because it is a falsy
# value. When returned outside of DebugCommunication an EOFError is typically
# converted into 'None'.
_InternalProtocolMessage = Union[Event, Request, Response, EOFError]
@@ -98,7 +147,7 @@ def dump_memory(base_addr, data, num_per_line, outfile):
def read_packet(
- f: BinaryIO, verbose=False, trace_file=None
+ f: IO[bytes], verbose=False, trace_file=None
) -> _InternalProtocolMessage:
"""Decode a JSON packet that starts with the content length and is
followed by the JSON bytes from a file 'f'. Returns None on EOF.
@@ -161,8 +210,8 @@ def __init__(
def __str__(self):
return f"Source(name={self.name}, path={self.path}), source_reference={self.source_reference})"
- def as_dict(self) -> dict:
- source_dict: dict[str, Any] = {}
+ def as_dict(self) -> Dict:
+ source_dict: Dict[str, Any] = {}
if self._name is not None:
source_dict["name"] = self._name
if self._path is not None:
@@ -175,9 +224,9 @@ def as_dict(self) -> dict:
class DebugCommunication(object):
def __init__(
self,
- recv: BinaryIO,
- send: BinaryIO,
- init_commands: list[str],
+ recv: IO[bytes],
+ send: IO[bytes],
+ init_commands: List[str],
log_file: Optional[str] = None,
):
# For debugging test failures, try setting `trace_file = sys.stderr`.
@@ -185,7 +234,7 @@ def __init__(
self.log_file = log_file
self.send = send
self.recv = recv
- self.recv_packets: list[_InternalProtocolMessage] = []
+ self.recv_packets: List[_InternalProtocolMessage] = []
self.recv_condition = threading.Condition()
self.recv_thread = threading.Thread(target=self._read_packet_thread)
self.exit_status: Optional[int] = None
@@ -193,18 +242,18 @@ def __init__(
self.initialize_body = None
self.initialized = False
self.configuration_done_sent = False
- self.process_event_body: Optional[dict] = None
+ self.process_event_body: Optional[Dict] = None
self.terminated = False
- self.progress_events: list[Event] = []
- self.reverse_requests: list[Request] = []
+ self.progress_events: List[Event] = []
+ self.reverse_requests: List[Request] = []
self.sequence = 1
self.output_condition = threading.Condition()
- self.output: dict[str, list[str]] = {}
+ self.output: Dict[str, List[str]] = {}
# debuggee state
self.threads = None
- self.thread_stop_reasons: dict[str, Any] = {}
- self.frame_scopes: dict[str, Any] = {}
+ self.thread_stop_reasons: Dict[str, Any] = {}
+ self.frame_scopes: Dict[str, Any] = {}
self.recv_thread.start()
@@ -213,10 +262,10 @@ def encode_content(cls, s: str) -> bytes:
return ("Content-Length: %u\r\n\r\n%s" % (len(s), s)).encode("utf-8")
@classmethod
- def validate_response(cls, command: Request, response: Response):
- if command["command"] != response["command"]:
+ def validate_response(cls, request: Request, response: Response):
+ if request["command"] != response["command"]:
raise ValueError("command mismatch in response")
- if command["seq"] != response["request_seq"]:
+ if request["seq"] != response["request_seq"]:
raise ValueError("seq mismatch in response")
def _read_packet_thread(self):
@@ -403,27 +452,33 @@ def _handle_reverse_request(self, request: Request):
desc = 'unknown reverse request "%s"' % (request["command"])
raise ValueError(desc)
- def send_packet(self, command_dict: ProtocolMessage) -> int:
+ def send_packet(self, packet: ProtocolMessage) -> int:
"""Take the "command_dict" python dictionary and encode it as a JSON
string and send the contents as a packet to the VSCode debug
- adapter."""
- seq = 0
- # Set the sequence ID for this command automatically
- if command_dict["type"] == "request":
- seq = command_dict["seq"] = self.sequence
+ adapter.
+
+ Returns the seq of the packet."""
+
+ # Set the seq for requests.
+ if packet["type"] == "request":
+ packet["seq"] = self.sequence
self.sequence += 1
else:
- command_dict["seq"] = 0
+ packet["seq"] = 0
+
# Encode our command dictionary as a JSON string
- json_str = json.dumps(command_dict, separators=(",", ":"))
+ json_str = json.dumps(packet, separators=(",", ":"))
+
if self.trace_file:
self.trace_file.write("to adapter:\n%s\n" % (json_str))
+
length = len(json_str)
if length > 0:
# Send the encoded JSON packet and flush the 'send' file
self.send.write(self.encode_content(json_str))
self.send.flush()
- return seq
+
+ return packet["seq"]
def receive_response(
self,
@@ -439,25 +494,25 @@ def predicate(p: Response):
def _recv_packet(
self,
*,
- predicate: Callable[[_T], bool],
+ predicate: Callable[[T], bool],
timeout: Optional[float] = None,
- ) -> Optional[_T]:
+ ) -> Optional[T]:
"""Get a JSON packet from the VSCode debug adapter. This function
assumes a thread that reads packets is running and will deliver
any received packets by calling handle_recv_packet(...). This
function will wait for the packet to arrive and return it when
it does."""
- with self.recv_condition:
- def _predicate():
- return next(
- filter(
- lambda p: isinstance(p, EOFError) or predicate(p),
- self.recv_packets,
- ),
- None,
- )
+ def _predicate():
+ return next(
+ filter(
+ lambda p: isinstance(p, EOFError) or predicate(p),
+ self.recv_packets,
+ ),
+ None,
+ )
+ with self.recv_condition:
packet = self.recv_condition.wait_for(_predicate, timeout=timeout)
if packet is None: # Timeout
return None
@@ -479,7 +534,7 @@ def _send_recv(self, command: Request) -> Optional[Response]:
return response
def wait_for_event(
- self, filter: list[str] = [], timeout: Optional[float] = None
+ self, filter: List[str] = [], timeout: Optional[float] = None
) -> Optional[Event]:
"""Wait for the first event that matches the filter."""
@@ -490,7 +545,7 @@ def predicate(p: Event):
def wait_for_stopped(
self, timeout: Optional[float] = None
- ) -> Optional[list[Event]]:
+ ) -> Optional[List[Event]]:
stopped_events = []
stopped_event = self.wait_for_event(
filter=["stopped", "exited"], timeout=timeout
@@ -509,7 +564,7 @@ def wait_for_stopped(
return stopped_events
def wait_for_breakpoint_events(self, timeout: Optional[float] = None):
- breakpoint_events: list[Event] = []
+ breakpoint_events: List[Event] = []
while True:
event = self.wait_for_event(["breakpoint"], timeout=timeout)
if not event:
@@ -685,62 +740,19 @@ def replay_packets(self, replay_file_path):
print("error: didn't get a valid response")
mode = "invalid"
- def request_attach(
- self,
- *,
- program: Optional[str] = None,
- pid: Optional[int] = None,
- waitFor=False,
- initCommands: Optional[list[str]] = None,
- preRunCommands: Optional[list[str]] = None,
- attachCommands: Optional[list[str]] = None,
- postRunCommands: Optional[list[str]] = None,
- stopCommands: Optional[list[str]] = None,
- exitCommands: Optional[list[str]] = None,
- terminateCommands: Optional[list[str]] = None,
- coreFile: Optional[str] = None,
- stopOnEntry=False,
- sourceMap: Optional[Union[list[tuple[str, str]], dict[str, str]]] = None,
- gdbRemotePort: Optional[int] = None,
- gdbRemoteHostname: Optional[str] = None,
- ):
- args_dict: dict[str, Any] = {}
- if pid is not None:
- args_dict["pid"] = pid
- if program is not None:
- args_dict["program"] = program
- if waitFor:
- args_dict["waitFor"] = waitFor
- args_dict["initCommands"] = self.init_commands
- if initCommands:
- args_dict["initCommands"].extend(initCommands)
- if preRunCommands:
- args_dict["preRunCommands"] = preRunCommands
- if stopCommands:
- args_dict["stopCommands"] = stopCommands
- if exitCommands:
- args_dict["exitCommands"] = exitCommands
- if terminateCommands:
- args_dict["terminateCommands"] = terminateCommands
- if attachCommands:
- args_dict["attachCommands"] = attachCommands
- if coreFile:
- args_dict["coreFile"] = coreFile
- if stopOnEntry:
- args_dict["stopOnEntry"] = stopOnEntry
- if postRunCommands:
- args_dict["postRunCommands"] = postRunCommands
- if sourceMap:
- args_dict["sourceMap"] = sourceMap
- if gdbRemotePort is not None:
- args_dict["gdb-remote-port"] = gdbRemotePort
- if gdbRemoteHostname is not None:
- args_dict["gdb-remote-hostname"] = gdbRemoteHostname
+ def request_attach(self, **kwargs: Unpack[AttachArguments]):
+ attach_args = cast(AttachArguments, {k: v for k, v in kwargs.items() if v})
+ attach_args.setdefault("disableASLR", True)
+ attach_args.setdefault("initCommands", [])
+ attach_args["initCommands"] = [
+ *self.init_commands,
+ *attach_args["initCommands"],
+ ]
command_dict: Request = {
"command": "attach",
"type": "request",
"seq": 0,
- "arguments": args_dict,
+ "arguments": attach_args,
}
return self._send_recv(command_dict)
@@ -919,87 +931,20 @@ def request_initialize(self, sourceInitFile=False):
self.initialize_body = response["body"]
return response
- def request_launch(
- self,
- program: str,
- *,
- args: Optional[list[str]] = None,
- cwd: Optional[str] = None,
- env: Optional[dict[str, str]] = None,
- stopOnEntry=False,
- disableASLR=True,
- disableSTDIO=False,
- shellExpandArguments=False,
- runInTerminal=False,
- enableAutoVariableSummaries=False,
- displayExtendedBacktrace=False,
- enableSyntheticChildDebugging=False,
- initCommands: Optional[list[str]] = None,
- preRunCommands: Optional[list[str]] = None,
- launchCommands: Optional[list[str]] = None,
- postRunCommands: Optional[list[str]] = None,
- stopCommands: Optional[list[str]] = None,
- exitCommands: Optional[list[str]] = None,
- terminateCommands: Optional[list[str]] = None,
- sourceMap: Optional[Union[list[tuple[str, str]], dict[str, str]]] = None,
- sourcePath: Optional[str] = None,
- debuggerRoot: Optional[str] = None,
- commandEscapePrefix: Optional[str] = None,
- customFrameFormat: Optional[str] = None,
- customThreadFormat: Optional[str] = None,
- ):
- args_dict: dict[str, Any] = {"program": program}
- if args:
- args_dict["args"] = args
- if cwd:
- args_dict["cwd"] = cwd
- if env:
- args_dict["env"] = env
- if stopOnEntry:
- args_dict["stopOnEntry"] = stopOnEntry
- if disableSTDIO:
- args_dict["disableSTDIO"] = disableSTDIO
- if shellExpandArguments:
- args_dict["shellExpandArguments"] = shellExpandArguments
- args_dict["initCommands"] = self.init_commands
- if initCommands:
- args_dict["initCommands"].extend(initCommands)
- if preRunCommands:
- args_dict["preRunCommands"] = preRunCommands
- if stopCommands:
- args_dict["stopCommands"] = stopCommands
- if exitCommands:
- args_dict["exitCommands"] = exitCommands
- if terminateCommands:
- args_dict["terminateCommands"] = terminateCommands
- if sourcePath:
- args_dict["sourcePath"] = sourcePath
- if debuggerRoot:
- args_dict["debuggerRoot"] = debuggerRoot
- if launchCommands:
- args_dict["launchCommands"] = launchCommands
- if sourceMap:
- args_dict["sourceMap"] = sourceMap
- if runInTerminal:
- args_dict["runInTerminal"] = runInTerminal
- if postRunCommands:
- args_dict["postRunCommands"] = postRunCommands
- if customFrameFormat:
- args_dict["customFrameFormat"] = customFrameFormat
- if customThreadFormat:
- args_dict["customThreadFormat"] = customThreadFormat
-
- args_dict["disableASLR"] = disableASLR
- args_dict["enableAutoVariableSummaries"] = enableAutoVariableSummaries
- args_dict["enableSyntheticChildDebugging"] = enableSyntheticChildDebugging
- args_dict["displayExtendedBacktrace"] = displayExtendedBacktrace
- if commandEscapePrefix is not None:
- args_dict["commandEscapePrefix"] = commandEscapePrefix
+ def request_launch(self, **kwargs: Unpack[LaunchArguments]):
+ launch_args = cast(LaunchArguments, {k: v for k, v in kwargs.items() if v})
+ launch_args.setdefault("disableASLR", True)
+ launch_args.setdefault("initCommands", [])
+ launch_args["initCommands"] = [
+ *self.init_commands,
+ *launch_args["initCommands"],
+ ]
+
command_dict: Request = {
"type": "request",
"seq": 0,
"command": "launch",
- "arguments": args_dict,
+ "arguments": kwargs,
}
return self._send_recv(command_dict)
@@ -1331,11 +1276,11 @@ def __init__(
self,
executable: Optional[str] = None,
connection: Optional[str] = None,
- init_commands: list[str] = [],
+ init_commands: List[str] = [],
log_file: Optional[str] = None,
- env: Optional[dict[str, str]] = None,
+ env: Optional[Dict[str, str]] = None,
):
- self.process = None
+ self.process: Optional[subprocess.Popen[bytes]] = None
self.connection = None
if executable is not None:
process, connection = DebugAdapterServer.launch(
@@ -1359,7 +1304,7 @@ def __init__(
self, s.makefile("rb"), s.makefile("wb"), init_commands, log_file
)
self.connection = connection
- else:
+ elif self.process and self.process.stdout and self.process.stdin:
DebugCommunication.__init__(
self, self.process.stdout, self.process.stdin, init_commands, log_file
)
@@ -1369,10 +1314,10 @@ def launch(
cls,
*,
executable: str,
- env: Optional[dict[str, str]] = None,
+ env: Optional[Dict[str, str]] = None,
log_file: Optional[str] = None,
connection: Optional[str] = None,
- ) -> tuple[subprocess.Popen, Optional[str]]:
+ ) -> Tuple[subprocess.Popen[bytes], Optional[str]]:
adapter_env = os.environ.copy()
if env is not None:
adapter_env.update(env)
@@ -1396,6 +1341,8 @@ def launch(
if connection is None:
return (process, None)
+ assert process.stdout
+
# lldb-dap will print the listening address once the listener is
# made to stdout. The listener is formatted like
# `connection://host:port` or `unix-connection:///path`.
@@ -1410,7 +1357,9 @@ def launch(
)
# If the listener expanded into multiple addresses, use the first.
- connection = out.removeprefix(expected_prefix).rstrip("\r\n").split(",", 1)[0]
+ if out.startswith(expected_prefix):
+ out = out[len(expected_prefix) :]
+ connection = out.rstrip("\r\n").split(",", 1)[0]
return (process, connection)
diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py
index 1ce640bd5a631..4ff34f4b18ffd 100644
--- a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py
+++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py
@@ -1,9 +1,16 @@
import os
import time
-from typing import Optional
+from typing import Dict, Optional
+from typing_extensions import Unpack
import uuid
-from dap_server import DebugAdapterServer, Source, Response
+from dap_server import (
+ DebugAdapterServer,
+ Source,
+ Response,
+ AttachArguments,
+ LaunchArguments,
+)
from lldbsuite.test.lldbtest import *
from lldbsuite.test import lldbplatformutil
import lldbgdbserverutils
@@ -17,7 +24,7 @@ class DAPTestCaseBase(TestBase):
def create_debug_adapter(
self,
- lldbDAPEnv: Optional[dict[str, str]] = None,
+ env: Optional[Dict[str, str]] = None,
connection: Optional[str] = None,
):
"""Create the Visual Studio Code debug adapter"""
@@ -30,16 +37,16 @@ def create_debug_adapter(
connection=connection,
init_commands=self.setUpCommands(),
log_file=log_file_path,
- env=lldbDAPEnv,
+ env=env,
)
def build_and_create_debug_adapter(
self,
- lldbDAPEnv: Optional[dict[str, str]] = None,
- dictionary: Optional[dict] = None,
+ adapter_env: Optional[Dict[str, str]] = None,
+ dictionary: Optional[Dict] = None,
):
self.build(dictionary=dictionary)
- self.create_debug_adapter(lldbDAPEnv)
+ self.create_debug_adapter(adapter_env)
def build_and_create_debug_adapter_for_attach(self):
"""Variant of build_and_create_debug_adapter that builds a uniquely
@@ -105,6 +112,7 @@ def waitUntil(self, condition_callback):
def assertResponseSuccess(self, response: Response):
self.assertIsNotNone(response)
+ self.assertIn("success", response)
if not response.get("success", False):
cmd = response.get("command", "<not set>")
msg = f"command ({cmd}) failed"
@@ -391,7 +399,7 @@ def attach(
disconnectAutomatically=True,
sourceInitFile=False,
expectFailure=False,
- **kwargs,
+ **kwargs: Unpack[AttachArguments],
):
"""Build the default Makefile target, create the DAP debug adapter,
and attach to the process.
@@ -418,12 +426,13 @@ def cleanup():
def launch(
self,
- program=None,
+ program: str,
+ /,
*,
sourceInitFile=False,
disconnectAutomatically=True,
expectFailure=False,
- **kwargs,
+ **kwargs: Unpack[LaunchArguments],
):
"""Sending launch request to dap"""
@@ -439,7 +448,8 @@ def cleanup():
# Initialize and launch the program
self.dap_server.request_initialize(sourceInitFile)
- response = self.dap_server.request_launch(program, **kwargs)
+ kwargs["program"] = program
+ response = self.dap_server.request_launch(**kwargs)
if expectFailure:
return response
if not (response and response["success"]):
@@ -450,17 +460,17 @@ def cleanup():
def build_and_launch(
self,
- program,
+ program: str,
+ /,
*,
- lldbDAPEnv: Optional[dict[str, str]] = None,
- **kwargs,
+ adapter_env: Optional[Dict[str, str]] = None,
+ **kwargs: Unpack[LaunchArguments],
):
"""Build the default Makefile target, create the DAP debug adapter,
and launch the process.
"""
- self.build_and_create_debug_adapter(lldbDAPEnv)
+ self.build_and_create_debug_adapter(adapter_env)
self.assertTrue(os.path.exists(program), "executable must exist")
-
return self.launch(program, **kwargs)
def getBuiltinDebugServerTool(self):
diff --git a/lldb/test/API/tools/lldb-dap/commands/TestDAP_commands.py b/lldb/test/API/tools/lldb-dap/commands/TestDAP_commands.py
index ea6b2ea7f28ab..10edce3620fcf 100644
--- a/lldb/test/API/tools/lldb-dap/commands/TestDAP_commands.py
+++ b/lldb/test/API/tools/lldb-dap/commands/TestDAP_commands.py
@@ -44,10 +44,10 @@ def do_test_abort_on_error(
commands = ["?!" + command_quiet, "!" + command_abort_on_error]
self.build_and_launch(
program,
- initCommands=commands if use_init_commands else None,
- launchCommands=commands if use_launch_commands else None,
- preRunCommands=commands if use_pre_run_commands else None,
- postRunCommands=commands if use_post_run_commands else None,
+ initCommands=commands if use_init_commands else [],
+ launchCommands=commands if use_launch_commands else [],
+ preRunCommands=commands if use_pre_run_commands else [],
+ postRunCommands=commands if use_post_run_commands else [],
expectFailure=True,
)
full_output = self.collect_console(
diff --git a/lldb/test/API/tools/lldb-dap/console/TestDAP_redirection_to_console.py b/lldb/test/API/tools/lldb-dap/console/TestDAP_redirection_to_console.py
index e367c327d4295..b6af471146338 100644
--- a/lldb/test/API/tools/lldb-dap/console/TestDAP_redirection_to_console.py
+++ b/lldb/test/API/tools/lldb-dap/console/TestDAP_redirection_to_console.py
@@ -16,7 +16,7 @@ def test(self):
"""
program = self.getBuildArtifact("a.out")
self.build_and_launch(
- program, lldbDAPEnv={"LLDB_DAP_TEST_STDOUT_STDERR_REDIRECTION": ""}
+ program, adapter_env={"LLDB_DAP_TEST_STDOUT_STDERR_REDIRECTION": "1"}
)
source = "main.cpp"
diff --git a/lldb/test/API/tools/lldb-dap/variables/TestDAP_variables.py b/lldb/test/API/tools/lldb-dap/variables/TestDAP_variables.py
index 340be0b39010d..3a7921007c4d2 100644
--- a/lldb/test/API/tools/lldb-dap/variables/TestDAP_variables.py
+++ b/lldb/test/API/tools/lldb-dap/variables/TestDAP_variables.py
@@ -104,7 +104,7 @@ def verify_variables(self, verify_dict, variables, varref_dict=None):
)
self.verify_values(verify_dict[name], variable, varref_dict)
- def darwin_dwarf_missing_obj(self, initCommands):
+ def darwin_dwarf_missing_obj(self, initCommands=[]):
self.build(debug_info="dwarf")
program = self.getBuildArtifact("a.out")
main_obj = self.getBuildArtifact("main.o")
@@ -791,13 +791,13 @@ def test_darwin_dwarf_missing_obj(self):
"""
Test that if we build a binary with DWARF in .o files and we remove
the .o file for main.cpp, that we get a variable named "<error>"
- whose value matches the appriopriate error. Errors when getting
+ whose value matches the appropriate error. Errors when getting
variables are returned in the LLDB API when the user should be
notified of issues that can easily be solved by rebuilding or
changing compiler options and are designed to give better feedback
to the user.
"""
- self.darwin_dwarf_missing_obj(None)
+ self.darwin_dwarf_missing_obj([])
@no_debug_info_test
@skipUnlessDarwin
More information about the lldb-commits
mailing list