[Lldb-commits] [lldb] [lldb-dap] Refactoring DebugCommunication to improve test consistency. (PR #143818)
Ebuka Ezike via lldb-commits
lldb-commits at lists.llvm.org
Mon Jun 16 07:19:04 PDT 2025
================
@@ -189,262 +298,322 @@ def _read_packet_thread(self):
while not done:
packet = read_packet(self.recv, trace_file=self.trace_file)
# `packet` will be `None` on EOF. We want to pass it down to
- # handle_recv_packet anyway so the main thread can handle unexpected
- # termination of lldb-dap and stop waiting for new packets.
+ # handle_recv_packet anyway so the main thread can handle
+ # unexpected termination of lldb-dap and stop waiting for new
+ # packets.
done = not self._handle_recv_packet(packet)
finally:
dump_dap_log(self.log_file)
- def get_modules(self):
- module_list = self.request_modules()["body"]["modules"]
- modules = {}
- for module in module_list:
- modules[module["name"]] = module
- return modules
+ def _handle_recv_packet(self, packet: Optional[ProtocolMessage]) -> bool:
+ """Handles an incoming packet.
- def get_output(self, category, timeout=0.0, clear=True):
- self.output_condition.acquire()
- output = None
- if category in self.output:
- output = self.output[category]
- if clear:
- del self.output[category]
- elif timeout != 0.0:
- self.output_condition.wait(timeout)
- if category in self.output:
- output = self.output[category]
- if clear:
- del self.output[category]
- self.output_condition.release()
- return output
+ Called by the read thread that is waiting for all incoming packets
+ to store the incoming packet in "self._recv_packets" in a thread safe
+ way. This function will then signal the "self._recv_condition" to
+ indicate a new packet is available.
- def collect_output(self, category, timeout_secs, pattern, clear=True):
- end_time = time.time() + timeout_secs
- collected_output = ""
- while end_time > time.time():
- output = self.get_output(category, timeout=0.25, clear=clear)
- if output:
- collected_output += output
- if pattern is not None and pattern in output:
- break
- return collected_output if collected_output else None
-
- def _enqueue_recv_packet(self, packet: Optional[ProtocolMessage]):
- self.recv_condition.acquire()
- self.recv_packets.append(packet)
- self.recv_condition.notify()
- self.recv_condition.release()
+ Args:
+ packet: A new packet to store.
- def _handle_recv_packet(self, packet: Optional[ProtocolMessage]) -> bool:
- """Called by the read thread that is waiting for all incoming packets
- to store the incoming packet in "self.recv_packets" in a thread safe
- way. This function will then signal the "self.recv_condition" to
- indicate a new packet is available. Returns True if the caller
- should keep calling this function for more packets.
+ Returns:
+ True if the caller should keep calling this function for more
+ packets.
"""
- # If EOF, notify the read thread by enqueuing a None.
- if not packet:
- self._enqueue_recv_packet(None)
- return False
-
- # Check the packet to see if is an event packet
- keepGoing = True
- packet_type = packet["type"]
- if packet_type == "event":
- event = packet["event"]
- body = None
- if "body" in packet:
- body = packet["body"]
- # Handle the event packet and cache information from these packets
- # as they come in
- if event == "output":
- # Store any output we receive so clients can retrieve it later.
- category = body["category"]
- output = body["output"]
- self.output_condition.acquire()
- if category in self.output:
- self.output[category] += output
- else:
- self.output[category] = output
- self.output_condition.notify()
- self.output_condition.release()
- # no need to add 'output' event packets to our packets list
- return keepGoing
- elif event == "initialized":
- self.initialized = True
- elif event == "process":
- # When a new process is attached or launched, remember the
- # details that are available in the body of the event
- self.process_event_body = body
- elif event == "exited":
- # Process exited, mark the status to indicate the process is not
- # alive.
- self.exit_status = body["exitCode"]
- elif event == "continued":
- # When the process continues, clear the known threads and
- # thread_stop_reasons.
- all_threads_continued = body.get("allThreadsContinued", True)
- tid = body["threadId"]
- if tid in self.thread_stop_reasons:
- del self.thread_stop_reasons[tid]
- self._process_continued(all_threads_continued)
- elif event == "stopped":
- # Each thread that stops with a reason will send a
- # 'stopped' event. We need to remember the thread stop
- # reasons since the 'threads' command doesn't return
- # that information.
- self._process_stopped()
- tid = body["threadId"]
- self.thread_stop_reasons[tid] = body
- elif event.startswith("progress"):
- # Progress events come in as 'progressStart', 'progressUpdate',
- # and 'progressEnd' events. Keep these around in case test
- # cases want to verify them.
- self.progress_events.append(packet)
- elif event == "breakpoint":
- # Breakpoint events are sent when a breakpoint is resolved
- self._update_verified_breakpoints([body["breakpoint"]])
- elif event == "capabilities":
- # Update the capabilities with new ones from the event.
- self.capabilities.update(body["capabilities"])
-
- elif packet_type == "response":
- if packet["command"] == "disconnect":
- keepGoing = False
- self._enqueue_recv_packet(packet)
- return keepGoing
+ with self._recv_condition:
+ self._recv_packets.append(packet)
+ self._recv_condition.notify()
+ # packet is None on EOF
+ return packet is not None and not (
+ packet["type"] == "response" and packet["command"] == "disconnect"
+ )
+
+ def _recv_packet(
+ self,
+ *,
+ predicate: Optional[Callable[[ProtocolMessage], bool]] = None,
+ timeout: Optional[float] = None,
+ ) -> Optional[ProtocolMessage]:
+ """Processes recived packets from the adapter.
+
+ Updates the DebugCommunication stateful properties based on the received
+ packets in the order they are recieved.
+
+ NOTE: The only time the session state properties should be updated is
+ during this call to ensure consistency during tests.
+
+ Args:
+ predicate:
+ Optional, if specified, returns the first packet that matches
+ the given predicate.
+ timeout:
+ Optional, if specified, processes packets until either the
+ timeout occurs or the predicate matches a packet, whichever
+ occurs first.
+
+ Returns:
+ The first matching packet for the given predicate, if specified,
+ otherwise None.
+ """
+ assert (
+ threading.current_thread != self._recv_thread
+ ), "Must not be called from the _recv_thread"
+
+ def process_until_match():
+ self._process_recv_packets()
+ for i, packet in enumerate(self._pending_packets):
+ if packet is None:
+ # We need to return a truthy value to break out of the
+ # wait_for, use `EOFError` as an indicator of EOF.
+ return EOFError()
+ if predicate and predicate(packet):
+ self._pending_packets.pop(i)
+ return packet
+
+ with self._recv_condition:
+ packet = self._recv_condition.wait_for(process_until_match, timeout)
+ return None if isinstance(packet, EOFError) else packet
+
+ def _process_recv_packets(self) -> None:
+ """Process received packets, updating the session state."""
+ with self._recv_condition:
+ for packet in self._recv_packets:
+ # Handle events that may modify any stateful properties of
+ # the DAP session.
+ if packet and packet["type"] == "event":
+ self._handle_event(packet)
+ elif packet and packet["type"] == "request":
+ # Handle reverse requests and keep processing.
+ self._handle_reverse_request(packet)
+ # Move the packet to the pending queue.
+ self._pending_packets.append(packet)
+ self._recv_packets.clear()
+
+ def _handle_event(self, packet: Event) -> None:
+ """Handle any events that modify debug session state we track."""
+ event = packet["event"]
+ body: Optional[Dict] = packet.get("body", None)
+
+ if event == "output":
+ # Store any output we receive so clients can retrieve it later.
+ category = body["category"]
+ output = body["output"]
+ if category in self.output:
+ self.output[category] += output
+ else:
+ self.output[category] = output
+ elif event == "initialized":
+ self.initialized = True
+ elif event == "process":
+ # When a new process is attached or launched, remember the
+ # details that are available in the body of the event
+ self.process_event_body = body
+ elif event == "exited":
+ # Process exited, mark the status to indicate the process is not
+ # alive.
+ self.exit_status = body["exitCode"]
+ elif event == "continued":
+ # When the process continues, clear the known threads and
+ # thread_stop_reasons.
+ all_threads_continued = (
+ body.get("allThreadsContinued", True) if body else True
+ )
+ tid = body["threadId"]
+ if tid in self.thread_stop_reasons:
+ del self.thread_stop_reasons[tid]
+ self._process_continued(all_threads_continued)
+ elif event == "stopped":
+ # Each thread that stops with a reason will send a
+ # 'stopped' event. We need to remember the thread stop
+ # reasons since the 'threads' command doesn't return
+ # that information.
+ self._process_stopped()
+ tid = body["threadId"]
+ self.thread_stop_reasons[tid] = body
+ elif event.startswith("progress"):
+ # Progress events come in as 'progressStart', 'progressUpdate',
+ # and 'progressEnd' events. Keep these around in case test
+ # cases want to verify them.
+ self.progress_events.append(packet)
+ elif event == "breakpoint":
+ # Breakpoint events are sent when a breakpoint is resolved
+ self._update_verified_breakpoints([body["breakpoint"]])
+ elif event == "capabilities":
+ # Update the capabilities with new ones from the event.
+ self.capabilities.update(body["capabilities"])
+
+ def _handle_reverse_request(self, request: Request) -> None:
+ if request in self.reverse_requests:
+ return
+ self.reverse_requests.append(request)
+ arguments = request.get("arguments")
+ if request["command"] == "runInTerminal" and arguments is not None:
+ in_shell = arguments.get("argsCanBeInterpretedByShell", False)
+ proc = subprocess.Popen(
+ arguments["args"],
+ env=arguments.get("env", {}),
+ cwd=arguments["cwd"],
+ stdin=subprocess.DEVNULL,
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ shell=in_shell,
+ )
+ body = {}
+ if in_shell:
+ body["shellProcessId"] = proc.pid
+ else:
+ body["processId"] = proc.pid
+ self.send_packet(
+ {
+ "type": "response",
+ "seq": 0,
+ "request_seq": request["seq"],
+ "success": True,
+ "command": "runInTerminal",
+ "message": None,
+ "body": body,
+ }
+ )
+ elif request["command"] == "startDebugging":
+ self.send_packet(
+ {
+ "type": "response",
+ "seq": 0,
+ "request_seq": request["seq"],
+ "success": True,
+ "message": None,
+ "command": "startDebugging",
+ "body": {},
+ }
+ )
+ else:
+ desc = 'unknown reverse request "%s"' % (request["command"])
+ raise ValueError(desc)
def _process_continued(self, all_threads_continued: bool):
self.frame_scopes = {}
if all_threads_continued:
self.thread_stop_reasons = {}
- def _update_verified_breakpoints(self, breakpoints: list[Event]):
+ def _update_verified_breakpoints(self, breakpoints: list[Breakpoint]):
for breakpoint in breakpoints:
- if "id" in breakpoint:
- self.resolved_breakpoints[str(breakpoint["id"])] = breakpoint.get(
- "verified", False
- )
+ # If no id is set, we cannot correlate the given breakpoint across
+ # requests, ignore it.
+ if "id" not in breakpoint:
+ continue
+
+ self.resolved_breakpoints[str(breakpoint["id"])] = breakpoint.get(
+ "verified", False
+ )
- def send_packet(self, command_dict: Request, set_sequence=True):
+ def _send_recv(self, request: Request) -> Optional[Response]:
+ """Send a command python dictionary as JSON and receive the JSON
+ response. Validates that the response is the correct sequence and
+ command in the reply. Any events that are received are added to the
+ events list in this object"""
+ seq = self.send_packet(request)
+ response = self.receive_response(seq)
+ if response is None:
+ desc = 'no response for "%s"' % (request["command"])
+ raise ValueError(desc)
+ self.validate_response(request, response)
+ return response
+
+ def send_packet(self, packet: ProtocolMessage) -> int:
"""Take the "command_dict" python dictionary and encode it as a JSON
string and send the contents as a packet to the VSCode debug
- adapter"""
- # Set the sequence ID for this command automatically
- if set_sequence:
- command_dict["seq"] = self.sequence
+ adapter.
+
+ Returns the seq of the packet."""
+ # Set the seq for requests.
+ if packet["type"] == "request":
+ packet["seq"] = self.sequence
self.sequence += 1
+ else:
+ packet["seq"] = 0
+
# Encode our command dictionary as a JSON string
- json_str = json.dumps(command_dict, separators=(",", ":"))
+ json_str = json.dumps(packet, separators=(",", ":"))
+
if self.trace_file:
self.trace_file.write("to adapter:\n%s\n" % (json_str))
+
length = len(json_str)
if length > 0:
# Send the encoded JSON packet and flush the 'send' file
self.send.write(self.encode_content(json_str))
self.send.flush()
- def recv_packet(
- self,
- filter_type: Optional[str] = None,
- filter_event: Optional[Union[str, list[str]]] = None,
- timeout: Optional[float] = None,
- ) -> Optional[ProtocolMessage]:
- """Get a JSON packet from the VSCode debug adapter. This function
- assumes a thread that reads packets is running and will deliver
- any received packets by calling handle_recv_packet(...). This
- function will wait for the packet to arrive and return it when
- it does."""
- while True:
- try:
- self.recv_condition.acquire()
- packet = None
- while True:
- for i, curr_packet in enumerate(self.recv_packets):
- if not curr_packet:
- raise EOFError
- packet_type = curr_packet["type"]
- if filter_type is None or packet_type in filter_type:
- if filter_event is None or (
- packet_type == "event"
- and curr_packet["event"] in filter_event
- ):
- packet = self.recv_packets.pop(i)
- break
- if packet:
- break
- # Sleep until packet is received
- len_before = len(self.recv_packets)
- self.recv_condition.wait(timeout)
- len_after = len(self.recv_packets)
- if len_before == len_after:
- return None # Timed out
- return packet
- except EOFError:
- return None
- finally:
- self.recv_condition.release()
-
- def send_recv(self, command):
- """Send a command python dictionary as JSON and receive the JSON
- response. Validates that the response is the correct sequence and
- command in the reply. Any events that are received are added to the
- events list in this object"""
- self.send_packet(command)
- done = False
- while not done:
- response_or_request = self.recv_packet(filter_type=["response", "request"])
- if response_or_request is None:
- desc = 'no response for "%s"' % (command["command"])
- raise ValueError(desc)
- if response_or_request["type"] == "response":
- self.validate_response(command, response_or_request)
- return response_or_request
- else:
- self.reverse_requests.append(response_or_request)
- if response_or_request["command"] == "runInTerminal":
- subprocess.Popen(
- response_or_request["arguments"]["args"],
- env=response_or_request["arguments"]["env"],
- )
- self.send_packet(
- {
- "type": "response",
- "request_seq": response_or_request["seq"],
- "success": True,
- "command": "runInTerminal",
- "body": {},
- },
- )
- elif response_or_request["command"] == "startDebugging":
- self.send_packet(
- {
- "type": "response",
- "request_seq": response_or_request["seq"],
- "success": True,
- "command": "startDebugging",
- "body": {},
- },
- )
- else:
- desc = 'unknown reverse request "%s"' % (
- response_or_request["command"]
- )
- raise ValueError(desc)
+ return packet["seq"]
- return None
+ def receive_response(self, seq: int) -> Optional[Response]:
+ """Waits for the a response with the associated request_sec."""
+
+ def predicate(p: ProtocolMessage):
+ return p["type"] == "response" and p["request_seq"] == seq
+
+ return cast(Optional[Response], self._recv_packet(predicate=predicate))
+
+ def get_modules(self):
+ modules = {}
+ resp = self.request_modules()
+ if resp["success"]:
+ module_list = resp["body"]["modules"]
+ for module in module_list:
+ modules[module["name"]] = module
+ return modules
----------------
da-viper wrote:
should throw since we are expecting a value
https://github.com/llvm/llvm-project/pull/143818
More information about the lldb-commits
mailing list